You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/05/28 15:49:00 UTC

[3/3] flink git commit: [FLINK-1907] Add Flink Scala Shell

[FLINK-1907] Add Flink Scala Shell

This can either be used to connect to a Flink cluster or in local mode
with a cluster in the same JVM.

This only works for Scala 2.10 for now because the Scala REPL was
modified in 2.11.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717f8812
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717f8812
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717f8812

Branch: refs/heads/master
Commit: 717f8812d8aa4c8a813e4ff1c3d36127ec1d87b8
Parents: 7f5edeb
Author: fleischhauf <ni...@googlemail.com>
Authored: Tue May 12 21:54:20 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu May 28 15:48:17 2015 +0200

----------------------------------------------------------------------
 docs/scala_shell_quickstart.md                  |  72 +++++
 flink-dist/pom.xml                              |  23 ++
 flink-dist/src/main/assemblies/bin.xml          |   7 +
 .../flink/api/java/RemoteEnvironment.java       |  10 +
 flink-staging/flink-scala-shell/pom.xml         | 281 +++++++++++++++++++
 .../org.apache.flink/api/java/JarHelper.java    | 211 ++++++++++++++
 .../api/java/ScalaShellRemoteEnvironment.java   |  83 ++++++
 .../org.apache.flink/api/scala/FlinkILoop.scala | 202 +++++++++++++
 .../org.apache.flink/api/scala/FlinkShell.scala |  96 +++++++
 .../src/test/resources/log4j-test.properties    |  24 ++
 .../src/test/resources/logback-test.xml         |  29 ++
 .../flink/api/scala/ScalaShellITSuite.scala     | 208 ++++++++++++++
 .../start-script/start-scala-shell.sh           |  58 ++++
 .../util/StreamingMultipleProgramsTestBase.java |   2 +-
 flink-staging/pom.xml                           |  17 ++
 .../flink/test/util/AbstractTestBase.java       |   6 +-
 .../test/util/MultipleProgramsTestBase.java     |   2 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   9 +-
 .../apache/flink/test/util/TestEnvironment.java |   2 +-
 .../apache/flink/test/util/FlinkTestBase.scala  |   2 +-
 20 files changed, 1333 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/docs/scala_shell_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/scala_shell_quickstart.md b/docs/scala_shell_quickstart.md
new file mode 100644
index 0000000..4ad7f5b
--- /dev/null
+++ b/docs/scala_shell_quickstart.md
@@ -0,0 +1,72 @@
+---
+title: "Quickstart: Scala Shell"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+Start working on your Flink Scala program in a few simple steps.
+
+## Startup Flink interactive Scala shell
+
+Flink has an integrated interactive Scala shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use it in a local setup just execute:
+
+__Sample Input__:
+~~~bash
+flink/bin/start-scala-shell.sh 
+~~~
+
+And it will initialize a local JobManager by itself.
+
+To use it in a cluster setup you can supply the host and port of the JobManager with:
+
+__Sample Input__:
+~~~bash
+flink/bin/start-scala-shell.sh -host "<hostname>" -port <portnumber>
+~~~
+
+
+## Usage
+
+The shell will prebind the ExecutionEnvironment as "env", so far only batch mode is supported.
+
+The following example will execute the wordcount program in the Scala shell:
+
+~~~scala
+Flink-Shell> val text = env.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,")
+Flink-Shell> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
+Flink-Shell> counts.print()
+~~~
+
+
+The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.
+
+It is possbile to write results to a file, like in the standard Scala api. However, in this case you need to call, to run your program:
+
+~~~scala
+Flink-Shell> env.execute("MyProgram")
+~~~
+
+The Flink Shell comes with command history and autocompletion.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index de9ed8a..ae9e8c9 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -123,6 +123,29 @@ under the License.
 	<!-- See main pom.xml for explanation of profiles -->
 	<profiles>
 		<profile>
+			<id>scala-2.10</id>
+			<activation>
+
+				<property>
+					<!-- this is the default scala profile -->
+					<name>!scala-2.11</name>
+				</property>
+			</activation>
+
+			<properties>
+				<scala.version>2.10.4</scala.version>
+				<scala.binary.version>2.10</scala.binary.version>
+			</properties>
+
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.flink</groupId>
+					<artifactId>flink-scala-shell</artifactId>
+					<version>${project.version}</version>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
 			<id>include-yarn</id>
 			<activation>
 				<property>

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 6a429ee..802799c 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -50,6 +50,12 @@ under the License.
 			<outputDirectory>bin</outputDirectory>
 			<fileMode>0755</fileMode>
 		</fileSet>
+		<!-- flink scala shell-->
+		<fileSet>
+			<directory>../flink-staging/flink-scala-shell/start-script/</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>755</fileMode>
+		</fileSet>
 		<fileSet>
 			<!-- copy yarn start scripts -->
 			<directory>src/main/flink-bin/yarn-bin</directory>
@@ -114,6 +120,7 @@ under the License.
 			</excludes>
 		</fileSet>
 
+
 		<fileSet>
 			<!-- copy jar files of java examples -->
 			<directory>../flink-examples/flink-java-examples/target</directory>

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 515037c..a2f2891 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -87,4 +87,14 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
 				(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
 	}
+
+
+	// needed to call execute on ScalaShellRemoteEnvironment
+	public int getPort() {
+		return this.port;
+	}
+
+	public String getHost() {
+		return this.host;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml
new file mode 100644
index 0000000..7a5b485
--- /dev/null
+++ b/flink-staging/flink-scala-shell/pom.xml
@@ -0,0 +1,281 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-staging</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-scala-shell</artifactId>
+	<name>flink-scala-shell</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- scala command line parsing -->
+		<dependency>
+			<groupId>com.github.scopt</groupId>
+            <artifactId>scopt_${scala.binary.version}</artifactId>
+		</dependency>
+
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-compiler</artifactId>
+			<version>${scala.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<version>${scala.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-reflect</artifactId>
+			<version>${scala.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>jline</artifactId>
+			<version>2.10.4</version>
+		</dependency>
+
+		<!-- tests -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins combine.children="append">
+						<compilerPlugin>
+							<groupId>org.scalamacros</groupId>
+							<artifactId>paradise_${scala.version}</artifactId>
+							<version>${scala.macros.version}</version>
+						</compilerPlugin>
+					</compilerPlugins>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalatest</groupId>
+				<artifactId>scalatest-maven-plugin</artifactId>
+				<version>1.0</version>
+				<configuration>
+					<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+					<stdout>W</stdout> <!-- Skip coloring output -->
+				</configuration>
+				<executions>
+					<execution>
+						<id>scala-test</id>
+						<goals>
+							<goal>test</goal>
+						</goals>
+						<configuration>
+							<suffixes>(?&lt;!(IT|Integration))(Test|Suite|Case)</suffixes>
+							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+						</configuration>
+					</execution>
+					<execution>
+						<id>integration-test</id>
+						<phase>integration-test</phase>
+						<goals>
+							<goal>test</goal>
+						</goals>
+						<configuration>
+							<suffixes>(IT|Integration)(Test|Suite|Case)</suffixes>
+							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
+		</plugins>
+	</build>
+
+	<profiles>
+		<profile>
+			<id>scala-2.10</id>
+			<activation>
+				<property>
+					<!-- this is the default scala profile -->
+					<name>!scala-2.11</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.scalamacros</groupId>
+					<artifactId>quasiquotes_${scala.binary.version}</artifactId>
+					<version>${scala.macros.version}</version>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
new file mode 100644
index 0000000..5def4b0
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+import java.util.jar.JarOutputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan <pc...@bea.com>
+ */
+public class JarHelper
+{
+	// ========================================================================
+	// Constants
+
+	private static final int BUFFER_SIZE = 2156;
+
+	// ========================================================================
+	// Variables
+
+	private byte[] mBuffer = new byte[BUFFER_SIZE];
+	private int mByteCount = 0;
+	private boolean mVerbose = false;
+	private String mDestJarName = "";
+
+	// ========================================================================
+	// Constructor
+
+	/**
+	 * Instantiates a new JarHelper.
+	 */
+	public JarHelper() {}
+
+	// ========================================================================
+	// Public methods
+
+	/**
+	 * Jars a given directory or single file into a JarOutputStream.
+	 */
+	public void jarDir(File dirOrFile2Jar, File destJar)
+		throws IOException {
+
+	if (dirOrFile2Jar == null || destJar == null)
+	{
+		throw new IllegalArgumentException();
+	}
+
+	mDestJarName = destJar.getCanonicalPath();
+	FileOutputStream fout = new FileOutputStream(destJar);
+	JarOutputStream jout = new JarOutputStream(fout);
+	//jout.setLevel(0);
+	try {
+		jarDir(dirOrFile2Jar, jout, null);
+	} catch(IOException ioe) {
+		throw ioe;
+	} finally {
+		jout.close();
+		fout.close();
+	}
+	}
+
+	/**
+	 * Unjars a given jar file into a given directory.
+	 */
+	public void unjarDir(File jarFile, File destDir) throws IOException {
+	BufferedOutputStream dest = null;
+	FileInputStream fis = new FileInputStream(jarFile);
+	unjar(fis, destDir);
+	}
+
+	/**
+	 * Given an InputStream on a jar file, unjars the contents into the given
+	 * directory.
+	 */
+	public void unjar(InputStream in, File destDir) throws IOException {
+	BufferedOutputStream dest = null;
+	JarInputStream jis = new JarInputStream(in);
+	JarEntry entry;
+	while ((entry = jis.getNextJarEntry()) != null) {
+		if (entry.isDirectory()) {
+		File dir = new File(destDir,entry.getName());
+		dir.mkdir();
+		if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());}
+		continue;
+		}
+		int count;
+		byte[] data = new byte[ BUFFER_SIZE ];
+		File destFile = new File(destDir, entry.getName());
+		if (mVerbose) {
+			System.out.println("unjarring " + destFile +
+					" from " + entry.getName());
+		}
+		FileOutputStream fos = new FileOutputStream(destFile);
+		dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+		while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+		dest.write(data, 0, count);
+		}
+		dest.flush();
+		dest.close();
+		if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());}
+	}
+	jis.close();
+	}
+
+	public void setVerbose(boolean b) {
+	mVerbose = b;
+	}
+
+	// ========================================================================
+	// Private methods
+
+	private static final char SEP = '/';
+	/**
+	 * Recursively jars up the given path under the given directory.
+	 */
+	private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
+		throws IOException {
+	if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
+	if (dirOrFile2jar.isDirectory()) {
+		String[] dirList = dirOrFile2jar.list();
+		String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP);
+		if (path != null) {
+		JarEntry je = new JarEntry(subPath);
+		je.setTime(dirOrFile2jar.lastModified());
+		jos.putNextEntry(je);
+		jos.flush();
+		jos.closeEntry();
+		}
+		for (int i = 0; i < dirList.length; i++) {
+		File f = new File(dirOrFile2jar, dirList[i]);
+		jarDir(f,jos,subPath);
+		}
+	} else {
+		if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
+		{
+		if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}
+		return;
+		}
+
+		if (mVerbose) {
+			System.out.println("adding " + dirOrFile2jar.getPath());
+		}
+		FileInputStream fis = new FileInputStream(dirOrFile2jar);
+		try {
+		JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
+		entry.setTime(dirOrFile2jar.lastModified());
+		jos.putNextEntry(entry);
+		while ((mByteCount = fis.read(mBuffer)) != -1) {
+			jos.write(mBuffer, 0, mByteCount);
+			if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");}
+		}
+		jos.flush();
+		jos.closeEntry();
+		} catch (IOException ioe) {
+		throw ioe;
+		} finally {
+		fis.close();
+		}
+	}
+	}
+
+	// for debugging
+	public static void main(String[] args)
+		throws IOException
+	{
+	if (args.length < 2)
+	{
+		System.err.println("Usage: JarHelper jarname.jar directory");
+		return;
+	}
+
+	JarHelper jarHelper = new JarHelper();
+	jarHelper.mVerbose = true;
+
+	File destJar = new File(args[0]);
+	File dirOrFile2Jar = new File(args[1]);
+
+	jarHelper.jarDir(dirOrFile2Jar, destJar);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
new file mode 100644
index 0000000..cb470c9
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
@@ -0,0 +1,83 @@
+
+package org.apache.flink.api.java;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+
+import org.apache.flink.api.scala.FlinkILoop;
+
+import java.io.File;
+
+/**
+ * ScalaShellRemoteEnvironment references the JobManager through host and port parameters,
+ * and the Scala Shell (FlinkILoop).
+ * Upon calling execute(), it reads compiled lines of the Scala shell, aggregates them to a Jar
+ * and sends aggregated jar to the JobManager.
+ */
+
+public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
+
+	// reference to Scala Shell, for access to virtual directory
+	private FlinkILoop flinkILoop;
+
+	/**
+	 * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop
+	 *
+	 * @param host	   The host name or address of the master (JobManager), where the program should be executed.
+	 * @param port	   The port of the master (JobManager), where the program should be executed.
+	 * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
+	 */
+	public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
+		super(host, port, jarFiles);
+		this.flinkILoop = flinkILoop;
+	}
+
+	/**
+	 * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
+	 *
+	 * @param jobName name of the job as string
+	 * @return Result of the computation
+	 * @throws Exception
+	 */
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		Plan p = createProgramPlan(jobName);
+
+		// write virtual files to disk first
+		JarHelper jh = new JarHelper();
+
+		flinkILoop.writeFilesToDisk();
+
+		// jarr up.
+		File inFile = new File( flinkILoop.getTmpDirShell().getAbsolutePath());
+		File outFile = new File( flinkILoop.getTmpJarShell().getAbsolutePath());
+
+		jh.jarDir(inFile, outFile);
+
+		String[] jarFiles = {outFile.getAbsolutePath()};
+
+		// call "traditional" execution methods
+		PlanExecutor executor = PlanExecutor.createRemoteExecutor(super.getHost(), super.getPort(), jarFiles);
+		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
+		return executor.executePlan(p);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
new file mode 100644
index 0000000..0de4953
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{BufferedReader, File, FileOutputStream}
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter._
+
+import org.apache.flink.api.java.ScalaShellRemoteEnvironment
+import org.apache.flink.util.AbstractID
+
+
+class FlinkILoop(val host: String,
+                 val port: Int,
+                 in0: Option[BufferedReader],
+                 out0: JPrintWriter)
+  extends ILoop(in0, out0) {
+
+  def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){
+    this(host:String, port:Int, Some(in0), out)
+  }
+
+  def this(host:String, port:Int){
+    this(host:String,port: Int,None, new JPrintWriter(Console.out, true))
+  }
+  // remote environment
+  private val remoteEnv: ScalaShellRemoteEnvironment = {
+    val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+    remoteEnv
+  }
+
+  // local environment
+  val scalaEnv: ExecutionEnvironment = {
+    val scalaEnv = new ExecutionEnvironment(remoteEnv)
+    scalaEnv
+  }
+
+
+  /**
+   * CUSTOM START METHODS OVERRIDE:
+   */
+
+  addThunk {
+    intp.beQuietDuring {
+      // automatically imports the flink scala api
+      intp.addImports("org.apache.flink.api.scala._")
+      intp.addImports("org.apache.flink.api.common.functions._")
+      // with this we can access this object in the scala shell
+      intp.bindValue("env", this.scalaEnv)
+    }
+  }
+
+
+
+  /**
+   * creates a temporary directory to store compiled console files
+   */
+  private val tmpDirBase: File = {
+    // get unique temporary folder:
+    val abstractID: String = new AbstractID().toString
+    val tmpDir: File = new File(
+      System.getProperty("java.io.tmpdir"),
+      "scala_shell_tmp-" + abstractID)
+    if (!tmpDir.exists) {
+      tmpDir.mkdir
+    }
+    tmpDir
+  }
+
+  // scala_shell commands
+  private val tmpDirShell: File = {
+    new File(tmpDirBase, "scala_shell_commands")
+  }
+
+  // scala shell jar file name
+  private val tmpJarShell: File = {
+    new File(tmpDirBase, "scala_shell_commands.jar")
+  }
+
+
+  /**
+   * writes contents of the compiled lines that have been executed in the shell into a
+   * "physical directory": creates a unique temporary directory
+   */
+  def writeFilesToDisk(): Unit = {
+    val vd = intp.virtualDirectory
+
+    var vdIt = vd.iterator
+
+    for (fi <- vdIt) {
+      if (fi.isDirectory) {
+
+        var fiIt = fi.iterator
+
+        for (f <- fiIt) {
+
+          // directory for compiled line
+          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+          lineDir.mkdirs()
+
+          // compiled classes for commands from shell
+          val writeFile = new File(lineDir.getAbsolutePath, f.name)
+          val outputStream = new FileOutputStream(writeFile)
+          val inputStream = f.input
+
+          // copy file contents
+          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+          inputStream.close()
+          outputStream.close()
+        }
+      }
+    }
+  }
+
+  /**
+   * CUSTOM START METHODS OVERRIDE:
+   */
+  override def prompt = "Scala-Flink> "
+
+  /**
+   * custom welcome message
+   */
+  override def printWelcome() {
+    echo(
+      """
+                         ▒▓██▓██▒
+                     ▓████▒▒█▓▒▓███▓▒
+                  ▓███▓░░        ▒▒▒▓██▒  ▒
+                ░██▒   ▒▒▓▓█▓▓▒░      ▒████
+                ██▒         ░▒▓███▒    ▒█▒█▒
+                  ░▓█            ███   ▓░▒██
+                    ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
+                  █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
+                  ████░   ▒▓█▓      ██▒▒▒ ▓███▒
+               ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
+         ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
+        ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
+      ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
+     ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
+    ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
+ ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
+ ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
+ ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
+ ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
+▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
+█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
+██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
+▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
+ ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
+ ▓█   ▒█▓   ░     █░                ▒█              █▓
+  █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
+   █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
+    ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
+     ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
+      ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
+        ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
+            ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
+
+              F L I N K - S C A L A - S H E L L
+
+NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
+  * env.readTextFile("/path/to/data")
+  * env.execute("Program name")
+
+HINT: You can use print() on a DataSet to print the contents to this shell.
+      """)
+  }
+
+  //  getter functions:
+  // get (root temporary folder)
+  def getTmpDirBase(): File = {
+    return (this.tmpDirBase);
+  }
+
+  // get shell folder name inside tmp dir
+  def getTmpDirShell(): File = {
+    return (this.tmpDirShell)
+  }
+
+  // get tmp jar file name
+  def getTmpJarShell(): File = {
+    return (this.tmpJarShell)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
new file mode 100644
index 0000000..90615ec
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+
+
+
+object FlinkShell {
+
+  def main(args: Array[String]) {
+
+    // scopt, command line arguments
+    case class Config(port: Int = -1,
+                      host: String = "none")
+    val parser = new scopt.OptionParser[Config] ("scopt") {
+      head ("scopt", "3.x")
+      opt[Int] ('p', "port") action {
+        (x, c) =>
+          c.copy (port = x)
+      } text ("port specifies port of running JobManager")
+      opt[(String)] ('h',"host") action {
+        case (x, c) =>
+          c.copy (host = x)
+      }  text ("host specifies host name of running JobManager")
+      help("help") text("prints this usage text")
+
+    }
+
+
+    // parse arguments
+    parser.parse (args, Config () ) map {
+      config =>
+        startShell(config.host,config.port);
+    } getOrElse {
+      // arguments are bad, usage message will have been displayed
+      println("Could not parse program arguments")
+    }
+  }
+
+
+  def startShell(userHost : String, userPort : Int): Unit ={
+    println("Starting Flink Shell:")
+
+    var cluster: LocalFlinkMiniCluster = null
+
+    // either port or userhost not specified by user, create new minicluster
+    val (host,port) = if (userHost == "none" || userPort == -1 )
+    {
+      println("Creating new local server")
+      cluster = new LocalFlinkMiniCluster(new Configuration, false)
+      ("localhost",cluster.getJobManagerRPCPort)
+    } else {
+      println(s"Connecting to remote server (host: $userHost, port: $userPort).")
+      (userHost, userPort)
+    }
+
+    // custom shell
+    val repl = new FlinkILoop(host, port) //new MyILoop();
+
+    repl.settings = new Settings()
+
+    repl.settings.usejavacp.value = true
+
+    // start scala interpreter shell
+    repl.process(repl.settings)
+
+    //repl.closeInterpreter()
+
+    if (cluster != null) {
+      cluster.stop()
+    }
+
+    println(" good bye ..")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties b/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9912b19
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=OFF, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml b/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
new file mode 100644
index 0000000..412592f
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+import java.net.URLClassLoader
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, ForkableFlinkMiniCluster, FlinkTestBase}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.FiniteDuration
+import scala.tools.nsc.Settings
+
+class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
+
+  test("Iteration test with iterative Pi example") {
+
+    val input : String =
+      """
+        val initial = env.fromElements(0)
+
+        val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
+          val result = iterationInput.map { i =>
+            val x = Math.random()
+            val y = Math.random()
+            i + (if (x * x + y * y < 1) 1 else 0)
+          }
+          result
+        }
+        val result = count map { c => c / 10000.0 * 4 }
+        result.collect()
+    """.stripMargin
+
+    val output : String = processInShell(input)
+
+    output should not include "failed"
+    output should not include "error"
+    output should not include "Exception"
+
+    output should include("Job execution switched to status FINISHED.")
+  }
+
+  test("WordCount in Shell") {
+    val input = """
+        val text = env.fromElements("To be, or not to be,--that is the question:--",
+        "Whether 'tis nobler in the mind to suffer",
+        "The slings and arrows of outrageous fortune",
+        "Or to take arms against a sea of troubles,")
+
+        val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
+        val result = counts.print()
+    """.stripMargin
+
+    val output = processInShell(input)
+
+    output should not include "failed"
+    output should not include "error"
+    output should not include "Exception"
+
+    output should include("Job execution switched to status FINISHED.")
+
+//    some of the words that should be included
+    output should include("(a,1)")
+    output should include("(whether,1)")
+    output should include("(to,4)")
+    output should include("(arrows,1)")
+  }
+
+  test("Sum 1..0, should be 55") {
+    val input : String =
+      """
+        val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
+        val reduced = input.reduce(_+_)
+        reduced.print
+      """.stripMargin
+
+    val output : String = processInShell(input)
+
+    output should not include "failed"
+    output should not include "error"
+    output should not include "Exception"
+
+    output should include("Job execution switched to status FINISHED.")
+
+    output should include("55")
+  }
+
+  test("WordCount in Shell with custom case class") {
+    val input : String =
+      """
+      case class WC(word: String, count: Int)
+
+      val wordCounts = env.fromElements(
+        new WC("hello", 1),
+        new WC("world", 2),
+        new WC("world", 8))
+
+      val reduced = wordCounts.groupBy(0).sum(1)
+
+      reduced.print()
+      """.stripMargin
+
+    val output : String = processInShell(input)
+
+    output should not include "failed"
+    output should not include "error"
+    output should not include "Exception"
+
+    output should include("Job execution switched to status FINISHED.")
+
+    output should include("WC(hello,1)")
+    output should include("WC(world,10)")
+  }
+
+
+  /**
+   * Run the input using a Scala Shell and return the output of the shell.
+   * @param input commands to be processed in the shell
+   * @return output of shell
+   */
+  def processInShell(input : String): String ={
+
+    val in = new BufferedReader(new StringReader(input + "\n"))
+    val out = new StringWriter()
+    val baos = new ByteArrayOutputStream()
+
+    val oldOut = System.out
+    System.setOut(new PrintStream(baos))
+
+    // new local cluster
+    val host = "localhost"
+    val port = cluster match {
+      case Some(c) => c.getJobManagerRPCPort
+
+      case _ => throw new RuntimeException("Test cluster not initialized.")
+    }
+
+    val cl = getClass.getClassLoader
+    var paths = new ArrayBuffer[String]
+    cl match {
+      case urlCl: URLClassLoader =>
+        for (url <- urlCl.getURLs) {
+          if (url.getProtocol == "file") {
+            paths += url.getFile
+          }
+        }
+      case _ =>
+    }
+
+    val classpath = paths.mkString(File.pathSeparator)
+
+    val repl = new FlinkILoop(host, port, in, new PrintWriter(out)) //new MyILoop();
+
+    repl.settings = new Settings()
+
+    // enable this line to use scala in intellij
+    repl.settings.usejavacp.value = true
+
+    repl.addedClasspath = classpath
+
+    repl.process(repl.settings)
+
+    repl.closeInterpreter()
+
+    System.setOut(oldOut)
+
+    val stdout = baos.toString
+
+    // reprint because ScalaTest fails if we don't
+    print(stdout)
+
+    out.toString + stdout
+  }
+
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+  val parallelism = 4
+
+  override def beforeAll(): Unit = {
+    val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false, false)
+    val clusterEnvironment = new TestEnvironment(cl, parallelism)
+    clusterEnvironment.setAsContext()
+
+    cluster = Some(cl)
+  }
+
+  override def afterAll(): Unit = {
+    cluster.map(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
new file mode 100644
index 0000000..4bbd49a
--- /dev/null
+++ b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
@@ -0,0 +1,58 @@
+#!/bin/bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# from scala-lang 2.10.4
+
+# restore stty settings (echo in particular)
+function restoreSttySettings() {
+  if [[ -n $SCALA_RUNNER_DEBUG ]]; then
+    echo "restoring stty:"
+    echo "$saved_stty"
+  fi
+  stty $saved_stty
+  saved_stty=""
+}
+
+function onExit() {
+  [[ "$saved_stty" != "" ]] && restoreSttySettings
+  exit $scala_exit_status
+}
+
+
+# to reenable echo if we are interrupted before completing.
+trap onExit INT
+# save terminal settings
+saved_stty=$(stty -g 2>/dev/null)
+# clear on error so we don't later try to restore them
+if [[ ! $? ]]; then
+  saved_stty=""
+fi
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+FLINK_CLASSPATH=`constructFlinkClassPath`
+
+java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
+
+#restore echo
+onExit
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 945ac07..a9ebd0b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -76,7 +76,7 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
 
 	@BeforeClass
 	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false);
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false, true);
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index bc7d101..f57f6a8 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -71,5 +71,22 @@ under the License.
 				<module>flink-tez</module>
 			</modules>
 		</profile>
+		<profile>
+			<id>scala-2.10</id>
+			<activation>
+
+				<property>
+					<!-- this is the default scala profile -->
+					<name>!scala-2.11</name>
+				</property>
+			</activation>
+			<properties>
+				<scala.version>2.10.4</scala.version>
+				<scala.binary.version>2.10</scala.binary.version>
+			</properties>
+			<modules>
+				<module>flink-scala-shell</module>
+			</modules>
+		</profile>
 	</profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index a5a7da9..a63f6ac 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -59,8 +59,8 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	//  Local Test Cluster Life Cycle
 	// --------------------------------------------------------------------------------------------
 
-	public void startCluster() throws Exception {
-		this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false);
+	public void startCluster() throws Exception{
+		this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false, true);
 	}
 
 	public void stopCluster() throws Exception {
@@ -142,4 +142,4 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 8dab485..97d5db0 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -101,7 +101,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 
 	@BeforeClass
 	public static void setup() throws Exception{
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer);
+		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer, true);
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 29cb2a4..92d66d9 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -97,10 +97,11 @@ public class TestBaseUtils {
 	}
 	
 	
-	protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
+	public static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
 															int taskManagerNumSlots,
 															StreamingMode mode,
-															boolean startWebserver) throws Exception {
+															boolean startWebserver,
+															boolean singleActorSystem) throws Exception {
 		
 		logDir = File.createTempFile("TestBaseUtils-logdir", null);
 		Assert.assertTrue("Unable to delete temp file", logDir.delete());
@@ -121,10 +122,10 @@ public class TestBaseUtils {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
 		
-		return new ForkableFlinkMiniCluster(config, true, mode);
+		return new ForkableFlinkMiniCluster(config, singleActorSystem, mode);
 	}
 
-	protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+	public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
 		if (logDir != null) {
 			FileUtils.deleteDirectory(logDir);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 25f2c83..cd10cc6 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -79,7 +79,7 @@ public class TestEnvironment extends ExecutionEnvironment {
 		return pc.compile(p);
 	}
 
-	protected void setAsContext() {
+	public void setAsContext() {
 		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
 			@Override
 			public ExecutionEnvironment createExecutionEnvironment() {

http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
index 3ea8624..bac1ede 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -54,7 +54,7 @@ trait FlinkTestBase extends BeforeAndAfter {
   val parallelism = 4
 
   before {
-    val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false)
+    val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false, true)
     val clusterEnvironment = new TestEnvironment(cl, parallelism)
     clusterEnvironment.setAsContext()