You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/07 16:11:13 UTC

[1/2] flink git commit: [FLINK-2066][core] Add configuration of delay between execution retries at job level

Repository: flink
Updated Branches:
  refs/heads/master b489c3673 -> 8d62033c2


[FLINK-2066][core] Add configuration of delay between execution retries at job level

This closes #1223


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a437a2b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a437a2b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a437a2b3

Branch: refs/heads/master
Commit: a437a2b396eb473db5e649c3879c34b67fffc943
Parents: b489c36
Author: wangchx <wc...@gmail.com>
Authored: Sat Oct 3 17:53:31 2015 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 7 12:56:46 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |  2 ++
 .../flink/api/common/ExecutionConfig.java       | 25 +++++++++++++++++-
 .../java/org/apache/flink/api/common/Plan.java  |  7 +++++
 .../flink/api/java/ExecutionEnvironment.java    | 25 ++++++++++++++++++
 .../plantranslate/JobGraphGenerator.java        |  1 +
 .../apache/flink/runtime/jobgraph/JobGraph.java | 27 ++++++++++++++++++++
 .../flink/api/scala/ExecutionEnvironment.scala  | 18 ++++++++++++-
 .../environment/StreamExecutionEnvironment.java | 24 +++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   | 15 +++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  | 17 ++++++++++++
 10 files changed, 159 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 3959dc9..da141a9 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1992,6 +1992,8 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
 
 - `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used.
 
+- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.
+
 - `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner. 
 
 - `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers fail to handle a POJO properly.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index df0248a..9ed3e92 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -34,6 +34,7 @@ import java.util.Objects;
  *     <li>The default parallelism of the program, i.e., how many parallel tasks to use for
  *         all functions that do not define a specific value directly.</li>
  *     <li>The number of retries in the case of failed executions.</li>
+ *     <li>The delay between delay between execution retries.</li>
  *     <li>The {@link ExecutionMode} of the program: Batch or Pipelined.
  *         The default execution mode is {@link ExecutionMode#PIPELINED}</li>
  *     <li>Enabling or disabling the "closure cleaner". The closure cleaner pre-processes
@@ -92,6 +93,8 @@ public class ExecutionConfig implements Serializable {
 	private long autoWatermarkInterval = 0;
 
 	private boolean timestampsEnabled = false;
+	
+	private long executionRetryDelay = -1;
 
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
@@ -242,6 +245,13 @@ public class ExecutionConfig implements Serializable {
 	public int getNumberOfExecutionRetries() {
 		return numberOfExecutionRetries;
 	}
+	
+	/**
+	 * @return The delay between retires.
+	 */
+	public long getExecutionRetryDelay() {
+		return executionRetryDelay;
+	}
 
 	/**
 	 * Sets the number of times that failed tasks are re-executed. A value of zero
@@ -258,7 +268,20 @@ public class ExecutionConfig implements Serializable {
 		this.numberOfExecutionRetries = numberOfExecutionRetries;
 		return this;
 	}
-
+	
+	/**
+	 * Sets the delay between executions. A value of {@code -1} indicates that the default value 
+	 * should be used.
+	 * @param executionRetryDelay The number of milliseconds the system will wait to retry.
+	 */
+	public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) {
+		if (executionRetryDelay < -1 ) {
+			throw new IllegalArgumentException(
+					"The delay between reties must be non-negative, or -1 (use system default)");
+		}
+		this.executionRetryDelay = executionRetryDelay;
+		return this;
+	}
 	/**
 	 * Sets the execution mode to execute the program. The execution mode defines whether
 	 * data exchanges are performed in a batch or on a pipelined manner.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index e0d1eb8..dc8d152 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -303,6 +303,13 @@ public class Plan implements Visitable<Operator<?>> {
 	}
 	
 	/**
+	 * Gets the delay between retry failed task.
+	 * @return The delay the system will wait to retry.
+	 */
+	public long getExecutionRetryDelay() {
+		return getExecutionConfig().getExecutionRetryDelay();
+	}
+	/**
 	 * Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
 	 * for data types and is specific to a particular data model (record, tuple, Scala, ...)
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a596765..01fb15c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -199,6 +199,31 @@ public abstract class ExecutionEnvironment {
 	}
 	
 	/**
+	 * Sets the delay that failed tasks are re-executed in milliseconds. A value of
+	 * zero effectively disables fault tolerance. A value of {@code -1}
+	 * indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 *
+	 * @param executionRetryDelay
+	 * 		The delay of time the system will wait to re-execute failed
+	 * 		tasks.
+	 */
+	public void setExecutionRetryDelay(long executionRetryDelay) {
+		config.setExecutionRetryDelay(executionRetryDelay);
+	}
+	
+	/**
+	 * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+	 * A value of {@code -1} indicates that the system default value (as defined
+	 * in the configuration) should be used.
+	 *
+	 * @return The delay time the system will wait to re-execute failed tasks.
+	 */
+	public long getExecutionRetryDelay() {
+		return config.getExecutionRetryDelay();
+	}
+	
+	/**
 	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
 	 * 
 	 * @return The execution result from the latest job execution.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index c15e47a..afd0682 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -219,6 +219,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// create the job graph object
 		JobGraph graph = new JobGraph(jobId, program.getJobName());
 		graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries());
+		graph.setExecutionRetryDelay(program.getOriginalPlan().getExecutionRetryDelay());
 		graph.setAllowQueuedScheduling(false);
 		graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index e4a0209..4014a76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -79,6 +79,8 @@ public class JobGraph implements Serializable {
 	
 	/** The number of times that failed tasks should be re-executed */
 	private int numExecutionRetries;
+	
+	private long executionRetryDelay;
 
 	/** The number of seconds after which the corresponding ExecutionGraph is removed at the
 	 * job manager after it has been executed. */
@@ -211,6 +213,31 @@ public class JobGraph implements Serializable {
 	public int getNumberOfExecutionRetries() {
 		return numExecutionRetries;
 	}
+	
+	/**
+	 * Gets the delay of time the system will try to re-execute failed tasks. A value of
+	 * {@code -1} indicates the system default value (as defined in the configuration)
+	 * should be used.
+	 * @return The delay of time in milliseconds the system will try to re-execute failed tasks.
+	 */
+	public long getExecutionRetryDelay() {
+		return executionRetryDelay;
+	}
+	
+	/**
+	 * Sets the delay that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param executionRetryDelay The delay of time the system will wait to re-execute failed tasks.
+	 */
+	public void setExecutionRetryDelay(long executionRetryDelay){
+		if (executionRetryDelay < -1) {
+			throw new IllegalArgumentException(
+					"The delay between reties must be non-negative, or -1 (use system default)");
+		}
+		this.executionRetryDelay = executionRetryDelay;
+	}
 
 	/**
 	 * Gets the timeout after which the corresponding ExecutionGraph is removed at the

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 3427225..e27d55a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -60,7 +60,7 @@ import scala.reflect.ClassTag
  *  - [[ExecutionEnvironment#createRemoteEnvironment]]
  *
  *  Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct environment depending
- *  on where the program is executed. If it is run inside an IDE a loca environment will be
+ *  on where the program is executed. If it is run inside an IDE a local environment will be
  *  created. If the program is submitted to a cluster a remote execution environment will
  *  be created.
  */
@@ -110,6 +110,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
   /**
+   * Sets the delay that failed tasks are re-executed. A value of
+   * zero effectively disables fault tolerance. A value of "-1"
+   * indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+    javaEnv.setExecutionRetryDelay(executionRetryDelay)
+  }
+
+  /**
+   * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+   * A value of "-1" indicates that the system default value (as defined
+   * in the configuration) should be used.
+   */
+  def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+  /**
    * Gets the UUID by which this environment is identified. The UUID sets the execution context
    * in the cluster or local environment.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 598d0df..c2e2880 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -414,6 +414,30 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the delay that failed tasks are re-executed. A value of
+	 * zero effectively disables fault tolerance. A value of {@code -1}
+	 * indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 *
+	 * @param executionRetryDelay
+	 * 		The delay of time the system will wait to re-execute failed
+	 * 		tasks.
+	 */
+	public void setExecutionRetryDelay(long executionRetryDelay){
+		config.setExecutionRetryDelay(executionRetryDelay);
+	}
+	
+	/**
+	 * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+	 * A value of {@code -1} indicates that the system default value (as defined
+	 * in the configuration) should be used.
+	 *
+	 * @return The delay time the system will wait to re-execute failed tasks.
+	 */
+	public long getExecutionRetryDelay(){
+		return config.getExecutionRetryDelay();
+	}
+	/**
 	 * Sets the default parallelism that will be used for the local execution
 	 * environment created by {@link #createLocalEnvironment()}.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8eb91a2..d8e81cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -99,6 +99,8 @@ public class StreamingJobGraphGenerator {
 		configureCheckpointing();
 
 		configureExecutionRetries();
+		
+		configureExecutionRetryDelay();
 
 		try {
 			InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);
@@ -419,6 +421,10 @@ public class StreamingJobGraphGenerator {
 			if(executionRetries == -1) {
 				streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
 			}
+			long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
+			if(executionRetryDelay == -1) {
+				streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000);
+			}
 		}
 	}
 
@@ -431,4 +437,13 @@ public class StreamingJobGraphGenerator {
 			jobGraph.setNumberOfExecutionRetries(0);
 		}
 	}
+	
+	private void configureExecutionRetryDelay() {
+		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
+		if (executionRetryDelay != -1) {
+			jobGraph.setExecutionRetryDelay(executionRetryDelay);
+		} else {
+			jobGraph.setExecutionRetryDelay(100 * 1000);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2474d8c..7492e48 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -210,6 +210,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
+  /**
+   * Sets the delay that failed tasks are re-executed. A value of
+   * zero effectively disables fault tolerance. A value of "-1"
+   * indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+    javaEnv.setExecutionRetryDelay(executionRetryDelay)
+  }
+
+  /**
+   * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+   * A value of "-1" indicates that the system default value (as defined
+   * in the configuration) should be used.
+   */
+  def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+
   // --------------------------------------------------------------------------------------------
   // Registry for types and serializers
   // --------------------------------------------------------------------------------------------


[2/2] flink git commit: [FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell. Update Scala 2.11 version and jline dependency.

Posted by fh...@apache.org.
[FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell.
Update Scala 2.11 version and jline dependency.

This closes #1197


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d62033c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d62033c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d62033c

Branch: refs/heads/master
Commit: 8d62033c23f50ac1c8ccca04b70c4fda1b8ba46c
Parents: a437a2b
Author: Chiwan Park <ch...@apache.org>
Authored: Sat Sep 26 02:47:06 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 7 12:57:40 2015 +0200

----------------------------------------------------------------------
 flink-dist/pom.xml                              |  29 +--
 flink-staging/flink-scala-shell/pom.xml         |  13 +-
 .../apache/flink/api/scala/ILoopCompat.scala    |  29 +++
 .../apache/flink/api/scala/ILoopCompat.scala    |  31 +++
 .../org.apache.flink/api/scala/FlinkILoop.scala | 218 ------------------
 .../org.apache.flink/api/scala/FlinkShell.scala | 108 ---------
 .../org/apache/flink/api/scala/FlinkILoop.scala | 224 +++++++++++++++++++
 .../org/apache/flink/api/scala/FlinkShell.scala | 107 +++++++++
 .../flink/api/scala/ScalaShellITSuite.scala     |   2 +-
 flink-staging/pom.xml                           |  18 +-
 pom.xml                                         |   2 +-
 11 files changed, 407 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 32059ea..f1745ed 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -125,34 +125,17 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala-shell</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 	</dependencies>
 
 	<!-- See main pom.xml for explanation of profiles -->
 	<profiles>
 		<profile>
-			<id>scala-2.10</id>
-			<activation>
-
-				<property>
-					<!-- this is the default scala profile -->
-					<name>!scala-2.11</name>
-				</property>
-			</activation>
-
-			<properties>
-				<scala.version>2.10.4</scala.version>
-				<scala.binary.version>2.10</scala.binary.version>
-			</properties>
-
-			<dependencies>
-				<dependency>
-					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-scala-shell</artifactId>
-					<version>${project.version}</version>
-				</dependency>
-			</dependencies>
-		</profile>
-		<profile>
 			<id>include-yarn</id>
 			<activation>
 				<property>

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml
index 94718a3..5adb8c6 100644
--- a/flink-staging/flink-scala-shell/pom.xml
+++ b/flink-staging/flink-scala-shell/pom.xml
@@ -76,12 +76,6 @@ under the License.
 			<version>${scala.version}</version>
 		</dependency>
 
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>jline</artifactId>
-			<version>2.10.4</version>
-		</dependency>
-
 		<!-- tests -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -180,6 +174,7 @@ under the License.
 						<configuration>
 							<sources>
 								<source>src/main/scala</source>
+								<source>src/main/scala-${scala.binary.version}</source>
 							</sources>
 						</configuration>
 					</execution>
@@ -274,6 +269,12 @@ under the License.
 					<artifactId>quasiquotes_${scala.binary.version}</artifactId>
 					<version>${scala.macros.version}</version>
 				</dependency>
+
+				<dependency>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>jline</artifactId>
+					<version>2.10.4</version>
+				</dependency>
 			</dependencies>
 		</profile>
 	</profiles>

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..797b420
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+    extends ILoop(in0, out0) {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..c1be6db
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+    extends ILoop(in0, out0) {
+
+  protected def addThunk(f: => Unit): Unit = f
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
deleted file mode 100644
index cd8a846..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.{BufferedReader, File, FileOutputStream}
-
-import scala.tools.nsc.interpreter._
-
-import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
-import org.apache.flink.util.AbstractID
-
-
-class FlinkILoop(
-    val host: String,
-    val port: Int,
-    val externalJars: Option[Array[String]],
-    in0: Option[BufferedReader],
-    out0: JPrintWriter)
-  extends ILoop(in0, out0) {
-  
-  
-  
-  def this(host:String, 
-           port:Int, 
-           externalJars : Option[Array[String]], 
-           in0: BufferedReader, 
-           out: JPrintWriter){
-    this(host:String, port:Int, externalJars, Some(in0), out)
-  }
-
-  def this(host:String, port:Int, externalJars : Option[Array[String]]){
-    this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
-  }
-  
-  def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
-    this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
-  }
-  // remote environment
-  private val remoteEnv: ScalaShellRemoteEnvironment = {
-    // allow creation of environments
-    ScalaShellRemoteEnvironment.resetContextEnvironments()
-    
-    // create our environment that submits against the cluster (local or remote)
-    val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
-    
-    // prevent further instantiation of environments
-    ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
-    
-    remoteEnv
-  }
-
-  // local environment
-  val scalaEnv: ExecutionEnvironment = {
-    val scalaEnv = new ExecutionEnvironment(remoteEnv)
-    scalaEnv
-  }
-
-  addThunk {
-    intp.beQuietDuring {
-      // automatically imports the flink scala api
-      intp.addImports("org.apache.flink.api.scala._")
-      intp.addImports("org.apache.flink.api.common.functions._")
-      // with this we can access this object in the scala shell
-      intp.bindValue("env", this.scalaEnv)
-    }
-  }
-
-
-  /**
-   * creates a temporary directory to store compiled console files
-   */
-  private val tmpDirBase: File = {
-    // get unique temporary folder:
-    val abstractID: String = new AbstractID().toString
-    val tmpDir: File = new File(
-      System.getProperty("java.io.tmpdir"),
-      "scala_shell_tmp-" + abstractID)
-    if (!tmpDir.exists) {
-      tmpDir.mkdir
-    }
-    tmpDir
-  }
-
-  // scala_shell commands
-  private val tmpDirShell: File = {
-    new File(tmpDirBase, "scala_shell_commands")
-  }
-
-  // scala shell jar file name
-  private val tmpJarShell: File = {
-    new File(tmpDirBase, "scala_shell_commands.jar")
-  }
-
-
-  /**
-   * Packages the compiled classes of the current shell session into a Jar file for execution
-   * on a Flink cluster.
-   *
-   * @return The path of the created Jar file
-   */
-  def writeFilesToDisk(): File = {
-    val vd = intp.virtualDirectory
-
-    val vdIt = vd.iterator
-
-    for (fi <- vdIt) {
-      if (fi.isDirectory) {
-
-        val fiIt = fi.iterator
-
-        for (f <- fiIt) {
-
-          // directory for compiled line
-          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
-          lineDir.mkdirs()
-
-          // compiled classes for commands from shell
-          val writeFile = new File(lineDir.getAbsolutePath, f.name)
-          val outputStream = new FileOutputStream(writeFile)
-          val inputStream = f.input
-
-          // copy file contents
-          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
-
-          inputStream.close()
-          outputStream.close()
-        }
-      }
-    }
-
-    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
-
-    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
-
-    val jh: JarHelper = new JarHelper
-    jh.jarDir(compiledClasses, jarFilePath)
-
-    jarFilePath
-  }
-
-  /**
-   * CUSTOM START METHODS OVERRIDE:
-   */
-  override def prompt = "Scala-Flink> "
-
-  /**
-   * custom welcome message
-   */
-  override def printWelcome() {
-    echo(
-      // scalastyle:off
-      """
-                         \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
-                     \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
-                  \u2593\u2588\u2588\u2588\u2593\u2591\u2591        \u2592\u2592\u2592\u2593\u2588\u2588\u2592  \u2592
-                \u2591\u2588\u2588\u2592   \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591      \u2592\u2588\u2588\u2588\u2588
-                \u2588\u2588\u2592         \u2591\u2592\u2593\u2588\u2588\u2588\u2592    \u2592\u2588\u2592\u2588\u2592
-                  \u2591\u2593\u2588            \u2588\u2588\u2588   \u2593\u2591\u2592\u2588\u2588
-                    \u2593\u2588       \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
-                  \u2588\u2591 \u2588   \u2592\u2592\u2591       \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
-                  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593      \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
-               \u2591\u2592\u2588\u2593\u2593\u2588\u2588       \u2593\u2588\u2592    \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
-         \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588         \u2592\u2588    \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
-        \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588           \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
-      \u2591\u2588\u2588\u2593  \u2591\u2588\u2591            \u2588  \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
-     \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591          \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591    \u2591\u2588\u2591\u2593  \u2593\u2591
-    \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592          \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591       \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
- \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588       \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591         \u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
- \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592              \u2592\u2588\u2588\u2593           \u2591\u2588\u2592
- \u2593\u2588    \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591              \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593          \u2591\u2592\u2591 \u2593\u2588
- \u2588\u2588\u2593    \u2588\u2588\u2592    \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592            \u2593\u2588\u2588\u2588  \u2588
-\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   \u2591\u2593\u2593\u2592\u2591\u2591   \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591                  \u2591\u2592\u2593\u2592  \u2588\u2593
-\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591                            \u2588\u2593
-\u2588\u2588 \u2593\u2591\u2592\u2588   \u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593       \u2592\u2593\u2593\u2588\u2588\u2593    \u2593\u2592          \u2592\u2592\u2593
-\u2593\u2588\u2593 \u2593\u2592\u2588  \u2588\u2593\u2591  \u2591\u2592\u2593\u2593\u2588\u2588\u2592            \u2591\u2593\u2588\u2592   \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
- \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592  \u2592\u2593\u2593\u2592  \u2593\u2588                \u2588\u2591      \u2591\u2591\u2591\u2591   \u2591\u2588\u2592
- \u2593\u2588   \u2592\u2588\u2593   \u2591     \u2588\u2591                \u2592\u2588              \u2588\u2593
-  \u2588\u2593   \u2588\u2588         \u2588\u2591                 \u2593\u2593        \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
-   \u2588\u2593 \u2591\u2593\u2588\u2588\u2591       \u2593\u2592                  \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591    \u2592\u2588
-    \u2588\u2588   \u2593\u2588\u2593\u2591      \u2592                    \u2591\u2592\u2588\u2592\u2588\u2588\u2592      \u2593\u2593
-     \u2593\u2588\u2592   \u2592\u2588\u2593\u2592\u2591                         \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
-      \u2591\u2588\u2588\u2592    \u2592\u2593\u2593\u2592                     \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
-        \u2591\u2593\u2588\u2588\u2592                          \u2593\u2591  \u2592\u2588\u2593\u2588  \u2591\u2591\u2592\u2592\u2592
-            \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593  \u2593\u2591\u2592\u2588\u2591
-
-              F L I N K - S C A L A - S H E L L
-
-NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
-  * env.readTextFile("/path/to/data")
-  * env.execute("Program name")
-
-HINT: You can use print() on a DataSet to print the contents to this shell.
-      """
-    // scalastyle:on
-    )
-
-  }
-
-  def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
deleted file mode 100644
index a4fae91..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-
-import scala.tools.nsc.Settings
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-
-
-object FlinkShell {
-
-  def main(args: Array[String]) {
-
-    // scopt, command line arguments
-    case class Config(
-        port: Int = -1,
-        host: String = "none",
-        externalJars: Option[Array[String]] = None)
-    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
-      head ("Flink Scala Shell")
-
-      opt[Int] ('p', "port") action {
-        (x, c) =>
-          c.copy (port = x)
-      } text("port specifies port of running JobManager")
-
-      opt[(String)] ('h',"host") action {
-        case (x, c) =>
-          c.copy (host = x)
-      }  text("host specifies host name of running JobManager")
-
-      opt[(String)] ('a',"addclasspath") action {
-        case (x,c) =>
-          val xArray = x.split(":")
-          c.copy(externalJars = Option(xArray))
-      } text("specifies additional jars to be used in Flink")
-      
-      help("help") text("prints this usage text")
-    }
-
-
-    // parse arguments
-    parser.parse (args, Config () ) match {
-      case Some(config) =>
-        startShell(config.host,config.port,config.externalJars)
-
-      case _ => println("Could not parse program arguments")
-    }
-  }
-
-
-  def startShell(
-      userHost : String, 
-      userPort : Int, 
-      externalJars : Option[Array[String]] = None): Unit ={
-    
-    println("Starting Flink Shell:")
-
-    var cluster: LocalFlinkMiniCluster = null
-
-    // either port or userhost not specified by user, create new minicluster
-    val (host,port) = if (userHost == "none" || userPort == -1 ) {
-      println("Creating new local server")
-      cluster = new LocalFlinkMiniCluster(new Configuration, false)
-      cluster.start()
-      ("localhost",cluster.getLeaderRPCPort)
-    } else {
-      println(s"Connecting to remote server (host: $userHost, port: $userPort).")
-      (userHost, userPort)
-    }
-    
-    // custom shell
-    val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
-
-    repl.settings = new Settings()
-
-    repl.settings.usejavacp.value = true
-
-    // start scala interpreter shell
-    repl.process(repl.settings)
-
-    //repl.closeInterpreter()
-
-    if (cluster != null) {
-      cluster.stop()
-    }
-
-    println(" good bye ..")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
new file mode 100644
index 0000000..bcf9bc2
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{BufferedReader, File, FileOutputStream}
+
+import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
+import org.apache.flink.util.AbstractID
+
+import scala.tools.nsc.interpreter._
+
+
+class FlinkILoop(
+    val host: String,
+    val port: Int,
+    val externalJars: Option[Array[String]],
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
+  extends ILoopCompat(in0, out0) {
+
+  def this(host:String, 
+           port:Int, 
+           externalJars : Option[Array[String]], 
+           in0: BufferedReader, 
+           out: JPrintWriter){
+    this(host:String, port:Int, externalJars, Some(in0), out)
+  }
+
+  def this(host:String, port:Int, externalJars : Option[Array[String]]){
+    this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+  }
+  
+  def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
+    this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
+  }
+
+  // remote environment
+  private val remoteEnv: ScalaShellRemoteEnvironment = {
+    // allow creation of environments
+    ScalaShellRemoteEnvironment.resetContextEnvironments()
+    
+    // create our environment that submits against the cluster (local or remote)
+    val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+    
+    // prevent further instantiation of environments
+    ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
+    
+    remoteEnv
+  }
+
+  // local environment
+  val scalaEnv: ExecutionEnvironment = {
+    val scalaEnv = new ExecutionEnvironment(remoteEnv)
+    scalaEnv
+  }
+
+  /**
+   * creates a temporary directory to store compiled console files
+   */
+  private val tmpDirBase: File = {
+    // get unique temporary folder:
+    val abstractID: String = new AbstractID().toString
+    val tmpDir: File = new File(
+      System.getProperty("java.io.tmpdir"),
+      "scala_shell_tmp-" + abstractID)
+    if (!tmpDir.exists) {
+      tmpDir.mkdir
+    }
+    tmpDir
+  }
+
+  // scala_shell commands
+  private val tmpDirShell: File = {
+    new File(tmpDirBase, "scala_shell_commands")
+  }
+
+  // scala shell jar file name
+  private val tmpJarShell: File = {
+    new File(tmpDirBase, "scala_shell_commands.jar")
+  }
+
+  private val packageImports = Seq[String](
+    "org.apache.flink.api.scala._",
+    "org.apache.flink.api.common.functions._"
+  )
+
+  override def createInterpreter(): Unit = {
+    super.createInterpreter()
+
+    addThunk {
+      intp.beQuietDuring {
+        // import dependencies
+        intp.interpret("import " + packageImports.mkString(", "))
+
+        // set execution environment
+        intp.bind("env", this.scalaEnv)
+      }
+    }
+  }
+
+  /**
+   * Packages the compiled classes of the current shell session into a Jar file for execution
+   * on a Flink cluster.
+   *
+   * @return The path of the created Jar file
+   */
+  def writeFilesToDisk(): File = {
+    val vd = intp.virtualDirectory
+
+    val vdIt = vd.iterator
+
+    for (fi <- vdIt) {
+      if (fi.isDirectory) {
+
+        val fiIt = fi.iterator
+
+        for (f <- fiIt) {
+
+          // directory for compiled line
+          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+          lineDir.mkdirs()
+
+          // compiled classes for commands from shell
+          val writeFile = new File(lineDir.getAbsolutePath, f.name)
+          val outputStream = new FileOutputStream(writeFile)
+          val inputStream = f.input
+
+          // copy file contents
+          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+          inputStream.close()
+          outputStream.close()
+        }
+      }
+    }
+
+    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
+    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+    val jh: JarHelper = new JarHelper
+    jh.jarDir(compiledClasses, jarFilePath)
+
+    jarFilePath
+  }
+
+  /**
+   * CUSTOM START METHODS OVERRIDE:
+   */
+  override def prompt = "Scala-Flink> "
+
+  /**
+   * custom welcome message
+   */
+  override def printWelcome() {
+    echo(
+      // scalastyle:off
+      """
+                         \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
+                     \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
+                  \u2593\u2588\u2588\u2588\u2593\u2591\u2591        \u2592\u2592\u2592\u2593\u2588\u2588\u2592  \u2592
+                \u2591\u2588\u2588\u2592   \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591      \u2592\u2588\u2588\u2588\u2588
+                \u2588\u2588\u2592         \u2591\u2592\u2593\u2588\u2588\u2588\u2592    \u2592\u2588\u2592\u2588\u2592
+                  \u2591\u2593\u2588            \u2588\u2588\u2588   \u2593\u2591\u2592\u2588\u2588
+                    \u2593\u2588       \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
+                  \u2588\u2591 \u2588   \u2592\u2592\u2591       \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
+                  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593      \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
+               \u2591\u2592\u2588\u2593\u2593\u2588\u2588       \u2593\u2588\u2592    \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
+         \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588         \u2592\u2588    \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
+        \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588           \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
+      \u2591\u2588\u2588\u2593  \u2591\u2588\u2591            \u2588  \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
+     \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591          \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591    \u2591\u2588\u2591\u2593  \u2593\u2591
+    \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592          \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591       \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
+ \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588       \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591         \u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
+ \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592              \u2592\u2588\u2588\u2593           \u2591\u2588\u2592
+ \u2593\u2588    \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591              \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593          \u2591\u2592\u2591 \u2593\u2588
+ \u2588\u2588\u2593    \u2588\u2588\u2592    \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592            \u2593\u2588\u2588\u2588  \u2588
+\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   \u2591\u2593\u2593\u2592\u2591\u2591   \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591                  \u2591\u2592\u2593\u2592  \u2588\u2593
+\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591                            \u2588\u2593
+\u2588\u2588 \u2593\u2591\u2592\u2588   \u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593       \u2592\u2593\u2593\u2588\u2588\u2593    \u2593\u2592          \u2592\u2592\u2593
+\u2593\u2588\u2593 \u2593\u2592\u2588  \u2588\u2593\u2591  \u2591\u2592\u2593\u2593\u2588\u2588\u2592            \u2591\u2593\u2588\u2592   \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
+ \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592  \u2592\u2593\u2593\u2592  \u2593\u2588                \u2588\u2591      \u2591\u2591\u2591\u2591   \u2591\u2588\u2592
+ \u2593\u2588   \u2592\u2588\u2593   \u2591     \u2588\u2591                \u2592\u2588              \u2588\u2593
+  \u2588\u2593   \u2588\u2588         \u2588\u2591                 \u2593\u2593        \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
+   \u2588\u2593 \u2591\u2593\u2588\u2588\u2591       \u2593\u2592                  \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591    \u2592\u2588
+    \u2588\u2588   \u2593\u2588\u2593\u2591      \u2592                    \u2591\u2592\u2588\u2592\u2588\u2588\u2592      \u2593\u2593
+     \u2593\u2588\u2592   \u2592\u2588\u2593\u2592\u2591                         \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
+      \u2591\u2588\u2588\u2592    \u2592\u2593\u2593\u2592                     \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
+        \u2591\u2593\u2588\u2588\u2592                          \u2593\u2591  \u2592\u2588\u2593\u2588  \u2591\u2591\u2592\u2592\u2592
+            \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593  \u2593\u2591\u2592\u2588\u2591
+
+              F L I N K - S C A L A - S H E L L
+
+NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
+  * env.readTextFile("/path/to/data")
+  * env.execute("Program name")
+
+HINT: You can use print() on a DataSet to print the contents to this shell.
+      """
+    // scalastyle:on
+    )
+
+  }
+
+  def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
new file mode 100644
index 0000000..224983b
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.Settings
+
+
+object FlinkShell {
+
+  def main(args: Array[String]) {
+
+    // scopt, command line arguments
+    case class Config(
+        port: Int = -1,
+        host: String = "none",
+        externalJars: Option[Array[String]] = None)
+    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+      head ("Flink Scala Shell")
+
+      opt[Int] ('p', "port") action {
+        (x, c) =>
+          c.copy (port = x)
+      } text("port specifies port of running JobManager")
+
+      opt[(String)] ('h',"host") action {
+        case (x, c) =>
+          c.copy (host = x)
+      }  text("host specifies host name of running JobManager")
+
+      opt[(String)] ('a',"addclasspath") action {
+        case (x,c) =>
+          val xArray = x.split(":")
+          c.copy(externalJars = Option(xArray))
+      } text("specifies additional jars to be used in Flink")
+      
+      help("help") text("prints this usage text")
+    }
+
+
+    // parse arguments
+    parser.parse (args, Config () ) match {
+      case Some(config) =>
+        startShell(config.host,config.port,config.externalJars)
+
+      case _ => println("Could not parse program arguments")
+    }
+  }
+
+
+  def startShell(
+      userHost : String, 
+      userPort : Int, 
+      externalJars : Option[Array[String]] = None): Unit ={
+    
+    println("Starting Flink Shell:")
+
+    var cluster: LocalFlinkMiniCluster = null
+
+    // either port or userhost not specified by user, create new minicluster
+    val (host,port) = if (userHost == "none" || userPort == -1 ) {
+      println("Creating new local server")
+      cluster = new LocalFlinkMiniCluster(new Configuration, false)
+      cluster.start()
+      ("localhost",cluster.getLeaderRPCPort)
+    } else {
+      println(s"Connecting to remote server (host: $userHost, port: $userPort).")
+      (userHost, userPort)
+    }
+    
+    // custom shell
+    val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
+
+    repl.settings = new Settings()
+
+    repl.settings.usejavacp.value = true
+
+    // start scala interpreter shell
+    repl.process(repl.settings)
+
+    //repl.closeInterpreter()
+
+    if (cluster != null) {
+      cluster.stop()
+    }
+
+    println(" good bye ..")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 7648c50..0621351 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -22,7 +22,7 @@ import java.io._
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.test.util.{TestEnvironment, ForkableFlinkMiniCluster, TestBaseUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 271d26c..67aec5a 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -46,6 +46,7 @@ under the License.
 		<module>flink-ml</module>
 		<module>flink-language-binding</module>
 		<module>flink-gelly-scala</module>
+		<module>flink-scala-shell</module>
 	</modules>
 	
 	<!-- See main pom.xml for explanation of profiles -->
@@ -71,22 +72,5 @@ under the License.
 				<module>flink-tez</module>
 			</modules>
 		</profile>
-		<profile>
-			<id>scala-2.10</id>
-			<activation>
-
-				<property>
-					<!-- this is the default scala profile -->
-					<name>!scala-2.11</name>
-				</property>
-			</activation>
-			<properties>
-				<scala.version>2.10.4</scala.version>
-				<scala.binary.version>2.10</scala.binary.version>
-			</properties>
-			<modules>
-				<module>flink-scala-shell</module>
-			</modules>
-		</profile>
 	</profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a5ff01..7e90ad6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -370,7 +370,7 @@ under the License.
 				</property>
 			</activation>
 			<properties>
-				<scala.version>2.11.4</scala.version>
+				<scala.version>2.11.7</scala.version>
 				<scala.binary.version>2.11</scala.binary.version>
 			</properties>
 		</profile>