You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/07 16:11:14 UTC

[2/2] flink git commit: [FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell. Update Scala 2.11 version and jline dependency.

[FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell.
Update Scala 2.11 version and jline dependency.

This closes #1197


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d62033c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d62033c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d62033c

Branch: refs/heads/master
Commit: 8d62033c23f50ac1c8ccca04b70c4fda1b8ba46c
Parents: a437a2b
Author: Chiwan Park <ch...@apache.org>
Authored: Sat Sep 26 02:47:06 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 7 12:57:40 2015 +0200

----------------------------------------------------------------------
 flink-dist/pom.xml                              |  29 +--
 flink-staging/flink-scala-shell/pom.xml         |  13 +-
 .../apache/flink/api/scala/ILoopCompat.scala    |  29 +++
 .../apache/flink/api/scala/ILoopCompat.scala    |  31 +++
 .../org.apache.flink/api/scala/FlinkILoop.scala | 218 ------------------
 .../org.apache.flink/api/scala/FlinkShell.scala | 108 ---------
 .../org/apache/flink/api/scala/FlinkILoop.scala | 224 +++++++++++++++++++
 .../org/apache/flink/api/scala/FlinkShell.scala | 107 +++++++++
 .../flink/api/scala/ScalaShellITSuite.scala     |   2 +-
 flink-staging/pom.xml                           |  18 +-
 pom.xml                                         |   2 +-
 11 files changed, 407 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 32059ea..f1745ed 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -125,34 +125,17 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala-shell</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 	</dependencies>
 
 	<!-- See main pom.xml for explanation of profiles -->
 	<profiles>
 		<profile>
-			<id>scala-2.10</id>
-			<activation>
-
-				<property>
-					<!-- this is the default scala profile -->
-					<name>!scala-2.11</name>
-				</property>
-			</activation>
-
-			<properties>
-				<scala.version>2.10.4</scala.version>
-				<scala.binary.version>2.10</scala.binary.version>
-			</properties>
-
-			<dependencies>
-				<dependency>
-					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-scala-shell</artifactId>
-					<version>${project.version}</version>
-				</dependency>
-			</dependencies>
-		</profile>
-		<profile>
 			<id>include-yarn</id>
 			<activation>
 				<property>

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml
index 94718a3..5adb8c6 100644
--- a/flink-staging/flink-scala-shell/pom.xml
+++ b/flink-staging/flink-scala-shell/pom.xml
@@ -76,12 +76,6 @@ under the License.
 			<version>${scala.version}</version>
 		</dependency>
 
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>jline</artifactId>
-			<version>2.10.4</version>
-		</dependency>
-
 		<!-- tests -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -180,6 +174,7 @@ under the License.
 						<configuration>
 							<sources>
 								<source>src/main/scala</source>
+								<source>src/main/scala-${scala.binary.version}</source>
 							</sources>
 						</configuration>
 					</execution>
@@ -274,6 +269,12 @@ under the License.
 					<artifactId>quasiquotes_${scala.binary.version}</artifactId>
 					<version>${scala.macros.version}</version>
 				</dependency>
+
+				<dependency>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>jline</artifactId>
+					<version>2.10.4</version>
+				</dependency>
 			</dependencies>
 		</profile>
 	</profiles>

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..797b420
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+    extends ILoop(in0, out0) {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..c1be6db
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+    extends ILoop(in0, out0) {
+
+  protected def addThunk(f: => Unit): Unit = f
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
deleted file mode 100644
index cd8a846..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.{BufferedReader, File, FileOutputStream}
-
-import scala.tools.nsc.interpreter._
-
-import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
-import org.apache.flink.util.AbstractID
-
-
-class FlinkILoop(
-    val host: String,
-    val port: Int,
-    val externalJars: Option[Array[String]],
-    in0: Option[BufferedReader],
-    out0: JPrintWriter)
-  extends ILoop(in0, out0) {
-  
-  
-  
-  def this(host:String, 
-           port:Int, 
-           externalJars : Option[Array[String]], 
-           in0: BufferedReader, 
-           out: JPrintWriter){
-    this(host:String, port:Int, externalJars, Some(in0), out)
-  }
-
-  def this(host:String, port:Int, externalJars : Option[Array[String]]){
-    this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
-  }
-  
-  def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
-    this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
-  }
-  // remote environment
-  private val remoteEnv: ScalaShellRemoteEnvironment = {
-    // allow creation of environments
-    ScalaShellRemoteEnvironment.resetContextEnvironments()
-    
-    // create our environment that submits against the cluster (local or remote)
-    val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
-    
-    // prevent further instantiation of environments
-    ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
-    
-    remoteEnv
-  }
-
-  // local environment
-  val scalaEnv: ExecutionEnvironment = {
-    val scalaEnv = new ExecutionEnvironment(remoteEnv)
-    scalaEnv
-  }
-
-  addThunk {
-    intp.beQuietDuring {
-      // automatically imports the flink scala api
-      intp.addImports("org.apache.flink.api.scala._")
-      intp.addImports("org.apache.flink.api.common.functions._")
-      // with this we can access this object in the scala shell
-      intp.bindValue("env", this.scalaEnv)
-    }
-  }
-
-
-  /**
-   * creates a temporary directory to store compiled console files
-   */
-  private val tmpDirBase: File = {
-    // get unique temporary folder:
-    val abstractID: String = new AbstractID().toString
-    val tmpDir: File = new File(
-      System.getProperty("java.io.tmpdir"),
-      "scala_shell_tmp-" + abstractID)
-    if (!tmpDir.exists) {
-      tmpDir.mkdir
-    }
-    tmpDir
-  }
-
-  // scala_shell commands
-  private val tmpDirShell: File = {
-    new File(tmpDirBase, "scala_shell_commands")
-  }
-
-  // scala shell jar file name
-  private val tmpJarShell: File = {
-    new File(tmpDirBase, "scala_shell_commands.jar")
-  }
-
-
-  /**
-   * Packages the compiled classes of the current shell session into a Jar file for execution
-   * on a Flink cluster.
-   *
-   * @return The path of the created Jar file
-   */
-  def writeFilesToDisk(): File = {
-    val vd = intp.virtualDirectory
-
-    val vdIt = vd.iterator
-
-    for (fi <- vdIt) {
-      if (fi.isDirectory) {
-
-        val fiIt = fi.iterator
-
-        for (f <- fiIt) {
-
-          // directory for compiled line
-          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
-          lineDir.mkdirs()
-
-          // compiled classes for commands from shell
-          val writeFile = new File(lineDir.getAbsolutePath, f.name)
-          val outputStream = new FileOutputStream(writeFile)
-          val inputStream = f.input
-
-          // copy file contents
-          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
-
-          inputStream.close()
-          outputStream.close()
-        }
-      }
-    }
-
-    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
-
-    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
-
-    val jh: JarHelper = new JarHelper
-    jh.jarDir(compiledClasses, jarFilePath)
-
-    jarFilePath
-  }
-
-  /**
-   * CUSTOM START METHODS OVERRIDE:
-   */
-  override def prompt = "Scala-Flink> "
-
-  /**
-   * custom welcome message
-   */
-  override def printWelcome() {
-    echo(
-      // scalastyle:off
-      """
-                         \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
-                     \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
-                  \u2593\u2588\u2588\u2588\u2593\u2591\u2591        \u2592\u2592\u2592\u2593\u2588\u2588\u2592  \u2592
-                \u2591\u2588\u2588\u2592   \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591      \u2592\u2588\u2588\u2588\u2588
-                \u2588\u2588\u2592         \u2591\u2592\u2593\u2588\u2588\u2588\u2592    \u2592\u2588\u2592\u2588\u2592
-                  \u2591\u2593\u2588            \u2588\u2588\u2588   \u2593\u2591\u2592\u2588\u2588
-                    \u2593\u2588       \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
-                  \u2588\u2591 \u2588   \u2592\u2592\u2591       \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
-                  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593      \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
-               \u2591\u2592\u2588\u2593\u2593\u2588\u2588       \u2593\u2588\u2592    \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
-         \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588         \u2592\u2588    \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
-        \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588           \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
-      \u2591\u2588\u2588\u2593  \u2591\u2588\u2591            \u2588  \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
-     \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591          \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591    \u2591\u2588\u2591\u2593  \u2593\u2591
-    \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592          \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591       \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
- \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588       \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591         \u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
- \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592              \u2592\u2588\u2588\u2593           \u2591\u2588\u2592
- \u2593\u2588    \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591              \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593          \u2591\u2592\u2591 \u2593\u2588
- \u2588\u2588\u2593    \u2588\u2588\u2592    \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592            \u2593\u2588\u2588\u2588  \u2588
-\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   \u2591\u2593\u2593\u2592\u2591\u2591   \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591                  \u2591\u2592\u2593\u2592  \u2588\u2593
-\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591                            \u2588\u2593
-\u2588\u2588 \u2593\u2591\u2592\u2588   \u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593       \u2592\u2593\u2593\u2588\u2588\u2593    \u2593\u2592          \u2592\u2592\u2593
-\u2593\u2588\u2593 \u2593\u2592\u2588  \u2588\u2593\u2591  \u2591\u2592\u2593\u2593\u2588\u2588\u2592            \u2591\u2593\u2588\u2592   \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
- \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592  \u2592\u2593\u2593\u2592  \u2593\u2588                \u2588\u2591      \u2591\u2591\u2591\u2591   \u2591\u2588\u2592
- \u2593\u2588   \u2592\u2588\u2593   \u2591     \u2588\u2591                \u2592\u2588              \u2588\u2593
-  \u2588\u2593   \u2588\u2588         \u2588\u2591                 \u2593\u2593        \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
-   \u2588\u2593 \u2591\u2593\u2588\u2588\u2591       \u2593\u2592                  \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591    \u2592\u2588
-    \u2588\u2588   \u2593\u2588\u2593\u2591      \u2592                    \u2591\u2592\u2588\u2592\u2588\u2588\u2592      \u2593\u2593
-     \u2593\u2588\u2592   \u2592\u2588\u2593\u2592\u2591                         \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
-      \u2591\u2588\u2588\u2592    \u2592\u2593\u2593\u2592                     \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
-        \u2591\u2593\u2588\u2588\u2592                          \u2593\u2591  \u2592\u2588\u2593\u2588  \u2591\u2591\u2592\u2592\u2592
-            \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593  \u2593\u2591\u2592\u2588\u2591
-
-              F L I N K - S C A L A - S H E L L
-
-NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
-  * env.readTextFile("/path/to/data")
-  * env.execute("Program name")
-
-HINT: You can use print() on a DataSet to print the contents to this shell.
-      """
-    // scalastyle:on
-    )
-
-  }
-
-  def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
deleted file mode 100644
index a4fae91..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-
-import scala.tools.nsc.Settings
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-
-
-object FlinkShell {
-
-  def main(args: Array[String]) {
-
-    // scopt, command line arguments
-    case class Config(
-        port: Int = -1,
-        host: String = "none",
-        externalJars: Option[Array[String]] = None)
-    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
-      head ("Flink Scala Shell")
-
-      opt[Int] ('p', "port") action {
-        (x, c) =>
-          c.copy (port = x)
-      } text("port specifies port of running JobManager")
-
-      opt[(String)] ('h',"host") action {
-        case (x, c) =>
-          c.copy (host = x)
-      }  text("host specifies host name of running JobManager")
-
-      opt[(String)] ('a',"addclasspath") action {
-        case (x,c) =>
-          val xArray = x.split(":")
-          c.copy(externalJars = Option(xArray))
-      } text("specifies additional jars to be used in Flink")
-      
-      help("help") text("prints this usage text")
-    }
-
-
-    // parse arguments
-    parser.parse (args, Config () ) match {
-      case Some(config) =>
-        startShell(config.host,config.port,config.externalJars)
-
-      case _ => println("Could not parse program arguments")
-    }
-  }
-
-
-  def startShell(
-      userHost : String, 
-      userPort : Int, 
-      externalJars : Option[Array[String]] = None): Unit ={
-    
-    println("Starting Flink Shell:")
-
-    var cluster: LocalFlinkMiniCluster = null
-
-    // either port or userhost not specified by user, create new minicluster
-    val (host,port) = if (userHost == "none" || userPort == -1 ) {
-      println("Creating new local server")
-      cluster = new LocalFlinkMiniCluster(new Configuration, false)
-      cluster.start()
-      ("localhost",cluster.getLeaderRPCPort)
-    } else {
-      println(s"Connecting to remote server (host: $userHost, port: $userPort).")
-      (userHost, userPort)
-    }
-    
-    // custom shell
-    val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
-
-    repl.settings = new Settings()
-
-    repl.settings.usejavacp.value = true
-
-    // start scala interpreter shell
-    repl.process(repl.settings)
-
-    //repl.closeInterpreter()
-
-    if (cluster != null) {
-      cluster.stop()
-    }
-
-    println(" good bye ..")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
new file mode 100644
index 0000000..bcf9bc2
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{BufferedReader, File, FileOutputStream}
+
+import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
+import org.apache.flink.util.AbstractID
+
+import scala.tools.nsc.interpreter._
+
+
+class FlinkILoop(
+    val host: String,
+    val port: Int,
+    val externalJars: Option[Array[String]],
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+  extends ILoopCompat(in0, out0) {
+
+  def this(host:String, 
+           port:Int, 
+           externalJars : Option[Array[String]], 
+           in0: BufferedReader, 
+           out: JPrintWriter){
+    this(host:String, port:Int, externalJars, Some(in0), out)
+  }
+
+  def this(host:String, port:Int, externalJars : Option[Array[String]]){
+    this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+  }
+  
+  def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
+    this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
+  }
+
+  // remote environment
+  private val remoteEnv: ScalaShellRemoteEnvironment = {
+    // allow creation of environments
+    ScalaShellRemoteEnvironment.resetContextEnvironments()
+    
+    // create our environment that submits against the cluster (local or remote)
+    val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+    
+    // prevent further instantiation of environments
+    ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
+    
+    remoteEnv
+  }
+
+  // local environment
+  val scalaEnv: ExecutionEnvironment = {
+    val scalaEnv = new ExecutionEnvironment(remoteEnv)
+    scalaEnv
+  }
+
+  /**
+   * creates a temporary directory to store compiled console files
+   */
+  private val tmpDirBase: File = {
+    // get unique temporary folder:
+    val abstractID: String = new AbstractID().toString
+    val tmpDir: File = new File(
+      System.getProperty("java.io.tmpdir"),
+      "scala_shell_tmp-" + abstractID)
+    if (!tmpDir.exists) {
+      tmpDir.mkdir
+    }
+    tmpDir
+  }
+
+  // scala_shell commands
+  private val tmpDirShell: File = {
+    new File(tmpDirBase, "scala_shell_commands")
+  }
+
+  // scala shell jar file name
+  private val tmpJarShell: File = {
+    new File(tmpDirBase, "scala_shell_commands.jar")
+  }
+
+  private val packageImports = Seq[String](
+    "org.apache.flink.api.scala._",
+    "org.apache.flink.api.common.functions._"
+  )
+
+  override def createInterpreter(): Unit = {
+    super.createInterpreter()
+
+    addThunk {
+      intp.beQuietDuring {
+        // import dependencies
+        intp.interpret("import " + packageImports.mkString(", "))
+
+        // set execution environment
+        intp.bind("env", this.scalaEnv)
+      }
+    }
+  }
+
+  /**
+   * Packages the compiled classes of the current shell session into a Jar file for execution
+   * on a Flink cluster.
+   *
+   * @return The path of the created Jar file
+   */
+  def writeFilesToDisk(): File = {
+    val vd = intp.virtualDirectory
+
+    val vdIt = vd.iterator
+
+    for (fi <- vdIt) {
+      if (fi.isDirectory) {
+
+        val fiIt = fi.iterator
+
+        for (f <- fiIt) {
+
+          // directory for compiled line
+          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+          lineDir.mkdirs()
+
+          // compiled classes for commands from shell
+          val writeFile = new File(lineDir.getAbsolutePath, f.name)
+          val outputStream = new FileOutputStream(writeFile)
+          val inputStream = f.input
+
+          // copy file contents
+          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+          inputStream.close()
+          outputStream.close()
+        }
+      }
+    }
+
+    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
+    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+    val jh: JarHelper = new JarHelper
+    jh.jarDir(compiledClasses, jarFilePath)
+
+    jarFilePath
+  }
+
+  /**
+   * CUSTOM START METHODS OVERRIDE:
+   */
+  override def prompt = "Scala-Flink> "
+
+  /**
+   * custom welcome message
+   */
+  override def printWelcome() {
+    echo(
+      // scalastyle:off
+      """
+                         \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
+                     \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
+                  \u2593\u2588\u2588\u2588\u2593\u2591\u2591        \u2592\u2592\u2592\u2593\u2588\u2588\u2592  \u2592
+                \u2591\u2588\u2588\u2592   \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591      \u2592\u2588\u2588\u2588\u2588
+                \u2588\u2588\u2592         \u2591\u2592\u2593\u2588\u2588\u2588\u2592    \u2592\u2588\u2592\u2588\u2592
+                  \u2591\u2593\u2588            \u2588\u2588\u2588   \u2593\u2591\u2592\u2588\u2588
+                    \u2593\u2588       \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
+                  \u2588\u2591 \u2588   \u2592\u2592\u2591       \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
+                  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593      \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
+               \u2591\u2592\u2588\u2593\u2593\u2588\u2588       \u2593\u2588\u2592    \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
+         \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588         \u2592\u2588    \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
+        \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588           \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
+      \u2591\u2588\u2588\u2593  \u2591\u2588\u2591            \u2588  \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
+     \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591          \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591    \u2591\u2588\u2591\u2593  \u2593\u2591
+    \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592          \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591       \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
+ \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588       \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591         \u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
+ \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592              \u2592\u2588\u2588\u2593           \u2591\u2588\u2592
+ \u2593\u2588    \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591              \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593          \u2591\u2592\u2591 \u2593\u2588
+ \u2588\u2588\u2593    \u2588\u2588\u2592    \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592            \u2593\u2588\u2588\u2588  \u2588
+\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   \u2591\u2593\u2593\u2592\u2591\u2591   \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591                  \u2591\u2592\u2593\u2592  \u2588\u2593
+\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591                            \u2588\u2593
+\u2588\u2588 \u2593\u2591\u2592\u2588   \u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593       \u2592\u2593\u2593\u2588\u2588\u2593    \u2593\u2592          \u2592\u2592\u2593
+\u2593\u2588\u2593 \u2593\u2592\u2588  \u2588\u2593\u2591  \u2591\u2592\u2593\u2593\u2588\u2588\u2592            \u2591\u2593\u2588\u2592   \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
+ \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592  \u2592\u2593\u2593\u2592  \u2593\u2588                \u2588\u2591      \u2591\u2591\u2591\u2591   \u2591\u2588\u2592
+ \u2593\u2588   \u2592\u2588\u2593   \u2591     \u2588\u2591                \u2592\u2588              \u2588\u2593
+  \u2588\u2593   \u2588\u2588         \u2588\u2591                 \u2593\u2593        \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
+   \u2588\u2593 \u2591\u2593\u2588\u2588\u2591       \u2593\u2592                  \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591    \u2592\u2588
+    \u2588\u2588   \u2593\u2588\u2593\u2591      \u2592                    \u2591\u2592\u2588\u2592\u2588\u2588\u2592      \u2593\u2593
+     \u2593\u2588\u2592   \u2592\u2588\u2593\u2592\u2591                         \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
+      \u2591\u2588\u2588\u2592    \u2592\u2593\u2593\u2592                     \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
+        \u2591\u2593\u2588\u2588\u2592                          \u2593\u2591  \u2592\u2588\u2593\u2588  \u2591\u2591\u2592\u2592\u2592
+            \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593  \u2593\u2591\u2592\u2588\u2591
+
+              F L I N K - S C A L A - S H E L L
+
+NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
+  * env.readTextFile("/path/to/data")
+  * env.execute("Program name")
+
+HINT: You can use print() on a DataSet to print the contents to this shell.
+      """
+    // scalastyle:on
+    )
+
+  }
+
+  def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
new file mode 100644
index 0000000..224983b
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.Settings
+
+
+object FlinkShell {
+
+  def main(args: Array[String]) {
+
+    // scopt, command line arguments
+    case class Config(
+        port: Int = -1,
+        host: String = "none",
+        externalJars: Option[Array[String]] = None)
+    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+      head ("Flink Scala Shell")
+
+      opt[Int] ('p', "port") action {
+        (x, c) =>
+          c.copy (port = x)
+      } text("port specifies port of running JobManager")
+
+      opt[(String)] ('h',"host") action {
+        case (x, c) =>
+          c.copy (host = x)
+      }  text("host specifies host name of running JobManager")
+
+      opt[(String)] ('a',"addclasspath") action {
+        case (x,c) =>
+          val xArray = x.split(":")
+          c.copy(externalJars = Option(xArray))
+      } text("specifies additional jars to be used in Flink")
+      
+      help("help") text("prints this usage text")
+    }
+
+
+    // parse arguments
+    parser.parse (args, Config () ) match {
+      case Some(config) =>
+        startShell(config.host,config.port,config.externalJars)
+
+      case _ => println("Could not parse program arguments")
+    }
+  }
+
+
+  def startShell(
+      userHost : String, 
+      userPort : Int, 
+      externalJars : Option[Array[String]] = None): Unit ={
+    
+    println("Starting Flink Shell:")
+
+    var cluster: LocalFlinkMiniCluster = null
+
+    // either port or userhost not specified by user, create new minicluster
+    val (host,port) = if (userHost == "none" || userPort == -1 ) {
+      println("Creating new local server")
+      cluster = new LocalFlinkMiniCluster(new Configuration, false)
+      cluster.start()
+      ("localhost",cluster.getLeaderRPCPort)
+    } else {
+      println(s"Connecting to remote server (host: $userHost, port: $userPort).")
+      (userHost, userPort)
+    }
+    
+    // custom shell
+    val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
+
+    repl.settings = new Settings()
+
+    repl.settings.usejavacp.value = true
+
+    // start scala interpreter shell
+    repl.process(repl.settings)
+
+    //repl.closeInterpreter()
+
+    if (cluster != null) {
+      cluster.stop()
+    }
+
+    println(" good bye ..")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 7648c50..0621351 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -22,7 +22,7 @@ import java.io._
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.test.util.{TestEnvironment, ForkableFlinkMiniCluster, TestBaseUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 271d26c..67aec5a 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -46,6 +46,7 @@ under the License.
 		<module>flink-ml</module>
 		<module>flink-language-binding</module>
 		<module>flink-gelly-scala</module>
+		<module>flink-scala-shell</module>
 	</modules>
 	
 	<!-- See main pom.xml for explanation of profiles -->
@@ -71,22 +72,5 @@ under the License.
 				<module>flink-tez</module>
 			</modules>
 		</profile>
-		<profile>
-			<id>scala-2.10</id>
-			<activation>
-
-				<property>
-					<!-- this is the default scala profile -->
-					<name>!scala-2.11</name>
-				</property>
-			</activation>
-			<properties>
-				<scala.version>2.10.4</scala.version>
-				<scala.binary.version>2.10</scala.binary.version>
-			</properties>
-			<modules>
-				<module>flink-scala-shell</module>
-			</modules>
-		</profile>
 	</profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a5ff01..7e90ad6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -370,7 +370,7 @@ under the License.
 				</property>
 			</activation>
 			<properties>
-				<scala.version>2.11.4</scala.version>
+				<scala.version>2.11.7</scala.version>
 				<scala.binary.version>2.11</scala.binary.version>
 			</properties>
 		</profile>