You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/07 16:11:13 UTC
[1/2] flink git commit: [FLINK-2066][core] Add configuration of delay
between execution retries at job level
Repository: flink
Updated Branches:
refs/heads/master b489c3673 -> 8d62033c2
[FLINK-2066][core] Add configuration of delay between execution retries at job level
This closes #1223
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a437a2b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a437a2b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a437a2b3
Branch: refs/heads/master
Commit: a437a2b396eb473db5e649c3879c34b67fffc943
Parents: b489c36
Author: wangchx <wc...@gmail.com>
Authored: Sat Oct 3 17:53:31 2015 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 7 12:56:46 2015 +0200
----------------------------------------------------------------------
docs/apis/programming_guide.md | 2 ++
.../flink/api/common/ExecutionConfig.java | 25 +++++++++++++++++-
.../java/org/apache/flink/api/common/Plan.java | 7 +++++
.../flink/api/java/ExecutionEnvironment.java | 25 ++++++++++++++++++
.../plantranslate/JobGraphGenerator.java | 1 +
.../apache/flink/runtime/jobgraph/JobGraph.java | 27 ++++++++++++++++++++
.../flink/api/scala/ExecutionEnvironment.scala | 18 ++++++++++++-
.../environment/StreamExecutionEnvironment.java | 24 +++++++++++++++++
.../api/graph/StreamingJobGraphGenerator.java | 15 +++++++++++
.../api/scala/StreamExecutionEnvironment.scala | 17 ++++++++++++
10 files changed, 159 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 3959dc9..da141a9 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1992,6 +1992,8 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
- `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used.
+- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.
+
- `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
- `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers fail to handle a POJO properly.
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index df0248a..9ed3e92 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -34,6 +34,7 @@ import java.util.Objects;
* <li>The default parallelism of the program, i.e., how many parallel tasks to use for
* all functions that do not define a specific value directly.</li>
* <li>The number of retries in the case of failed executions.</li>
+ * <li>The delay between delay between execution retries.</li>
* <li>The {@link ExecutionMode} of the program: Batch or Pipelined.
* The default execution mode is {@link ExecutionMode#PIPELINED}</li>
* <li>Enabling or disabling the "closure cleaner". The closure cleaner pre-processes
@@ -92,6 +93,8 @@ public class ExecutionConfig implements Serializable {
private long autoWatermarkInterval = 0;
private boolean timestampsEnabled = false;
+
+ private long executionRetryDelay = -1;
// Serializers and types registered with Kryo and the PojoSerializer
// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
@@ -242,6 +245,13 @@ public class ExecutionConfig implements Serializable {
public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries;
}
+
+ /**
+ * @return The delay between retires.
+ */
+ public long getExecutionRetryDelay() {
+ return executionRetryDelay;
+ }
/**
* Sets the number of times that failed tasks are re-executed. A value of zero
@@ -258,7 +268,20 @@ public class ExecutionConfig implements Serializable {
this.numberOfExecutionRetries = numberOfExecutionRetries;
return this;
}
-
+
+ /**
+ * Sets the delay between executions. A value of {@code -1} indicates that the default value
+ * should be used.
+ * @param executionRetryDelay The number of milliseconds the system will wait to retry.
+ */
+ public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) {
+ if (executionRetryDelay < -1 ) {
+ throw new IllegalArgumentException(
+ "The delay between reties must be non-negative, or -1 (use system default)");
+ }
+ this.executionRetryDelay = executionRetryDelay;
+ return this;
+ }
/**
* Sets the execution mode to execute the program. The execution mode defines whether
* data exchanges are performed in a batch or on a pipelined manner.
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index e0d1eb8..dc8d152 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -303,6 +303,13 @@ public class Plan implements Visitable<Operator<?>> {
}
/**
+ * Gets the delay between retry failed task.
+ * @return The delay the system will wait to retry.
+ */
+ public long getExecutionRetryDelay() {
+ return getExecutionConfig().getExecutionRetryDelay();
+ }
+ /**
* Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
* for data types and is specific to a particular data model (record, tuple, Scala, ...)
*
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a596765..01fb15c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -199,6 +199,31 @@ public abstract class ExecutionEnvironment {
}
/**
+ * Sets the delay that failed tasks are re-executed in milliseconds. A value of
+ * zero effectively disables fault tolerance. A value of {@code -1}
+ * indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @param executionRetryDelay
+ * The delay of time the system will wait to re-execute failed
+ * tasks.
+ */
+ public void setExecutionRetryDelay(long executionRetryDelay) {
+ config.setExecutionRetryDelay(executionRetryDelay);
+ }
+
+ /**
+ * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+ * A value of {@code -1} indicates that the system default value (as defined
+ * in the configuration) should be used.
+ *
+ * @return The delay time the system will wait to re-execute failed tasks.
+ */
+ public long getExecutionRetryDelay() {
+ return config.getExecutionRetryDelay();
+ }
+
+ /**
* Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
*
* @return The execution result from the latest job execution.
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index c15e47a..afd0682 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -219,6 +219,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// create the job graph object
JobGraph graph = new JobGraph(jobId, program.getJobName());
graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries());
+ graph.setExecutionRetryDelay(program.getOriginalPlan().getExecutionRetryDelay());
graph.setAllowQueuedScheduling(false);
graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index e4a0209..4014a76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -79,6 +79,8 @@ public class JobGraph implements Serializable {
/** The number of times that failed tasks should be re-executed */
private int numExecutionRetries;
+
+ private long executionRetryDelay;
/** The number of seconds after which the corresponding ExecutionGraph is removed at the
* job manager after it has been executed. */
@@ -211,6 +213,31 @@ public class JobGraph implements Serializable {
public int getNumberOfExecutionRetries() {
return numExecutionRetries;
}
+
+ /**
+ * Gets the delay of time the system will try to re-execute failed tasks. A value of
+ * {@code -1} indicates the system default value (as defined in the configuration)
+ * should be used.
+ * @return The delay of time in milliseconds the system will try to re-execute failed tasks.
+ */
+ public long getExecutionRetryDelay() {
+ return executionRetryDelay;
+ }
+
+ /**
+ * Sets the delay that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param executionRetryDelay The delay of time the system will wait to re-execute failed tasks.
+ */
+ public void setExecutionRetryDelay(long executionRetryDelay){
+ if (executionRetryDelay < -1) {
+ throw new IllegalArgumentException(
+ "The delay between reties must be non-negative, or -1 (use system default)");
+ }
+ this.executionRetryDelay = executionRetryDelay;
+ }
/**
* Gets the timeout after which the corresponding ExecutionGraph is removed at the
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 3427225..e27d55a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -60,7 +60,7 @@ import scala.reflect.ClassTag
* - [[ExecutionEnvironment#createRemoteEnvironment]]
*
* Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct environment depending
- * on where the program is executed. If it is run inside an IDE a loca environment will be
+ * on where the program is executed. If it is run inside an IDE a local environment will be
* created. If the program is submitted to a cluster a remote execution environment will
* be created.
*/
@@ -110,6 +110,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
/**
+ * Sets the delay that failed tasks are re-executed. A value of
+ * zero effectively disables fault tolerance. A value of "-1"
+ * indicates that the system default value (as defined in the configuration)
+ * should be used.
+ */
+ def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+ javaEnv.setExecutionRetryDelay(executionRetryDelay)
+ }
+
+ /**
+ * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+ * A value of "-1" indicates that the system default value (as defined
+ * in the configuration) should be used.
+ */
+ def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+ /**
* Gets the UUID by which this environment is identified. The UUID sets the execution context
* in the cluster or local environment.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 598d0df..c2e2880 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -414,6 +414,30 @@ public abstract class StreamExecutionEnvironment {
}
/**
+ * Sets the delay that failed tasks are re-executed. A value of
+ * zero effectively disables fault tolerance. A value of {@code -1}
+ * indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @param executionRetryDelay
+ * The delay of time the system will wait to re-execute failed
+ * tasks.
+ */
+ public void setExecutionRetryDelay(long executionRetryDelay){
+ config.setExecutionRetryDelay(executionRetryDelay);
+ }
+
+ /**
+ * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+ * A value of {@code -1} indicates that the system default value (as defined
+ * in the configuration) should be used.
+ *
+ * @return The delay time the system will wait to re-execute failed tasks.
+ */
+ public long getExecutionRetryDelay(){
+ return config.getExecutionRetryDelay();
+ }
+ /**
* Sets the default parallelism that will be used for the local execution
* environment created by {@link #createLocalEnvironment()}.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8eb91a2..d8e81cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -99,6 +99,8 @@ public class StreamingJobGraphGenerator {
configureCheckpointing();
configureExecutionRetries();
+
+ configureExecutionRetryDelay();
try {
InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);
@@ -419,6 +421,10 @@ public class StreamingJobGraphGenerator {
if(executionRetries == -1) {
streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
}
+ long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
+ if(executionRetryDelay == -1) {
+ streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000);
+ }
}
}
@@ -431,4 +437,13 @@ public class StreamingJobGraphGenerator {
jobGraph.setNumberOfExecutionRetries(0);
}
}
+
+ private void configureExecutionRetryDelay() {
+ long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
+ if (executionRetryDelay != -1) {
+ jobGraph.setExecutionRetryDelay(executionRetryDelay);
+ } else {
+ jobGraph.setExecutionRetryDelay(100 * 1000);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2474d8c..7492e48 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -210,6 +210,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
+ /**
+ * Sets the delay that failed tasks are re-executed. A value of
+ * zero effectively disables fault tolerance. A value of "-1"
+ * indicates that the system default value (as defined in the configuration)
+ * should be used.
+ */
+ def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+ javaEnv.setExecutionRetryDelay(executionRetryDelay)
+ }
+
+ /**
+ * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+ * A value of "-1" indicates that the system default value (as defined
+ * in the configuration) should be used.
+ */
+ def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+
// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
[2/2] flink git commit: [FLINK-2767] [scala shell] Add Scala 2.11
support to Scala shell. Update Scala 2.11 version and jline dependency.
Posted by fh...@apache.org.
[FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell.
Update Scala 2.11 version and jline dependency.
This closes #1197
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d62033c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d62033c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d62033c
Branch: refs/heads/master
Commit: 8d62033c23f50ac1c8ccca04b70c4fda1b8ba46c
Parents: a437a2b
Author: Chiwan Park <ch...@apache.org>
Authored: Sat Sep 26 02:47:06 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 7 12:57:40 2015 +0200
----------------------------------------------------------------------
flink-dist/pom.xml | 29 +--
flink-staging/flink-scala-shell/pom.xml | 13 +-
.../apache/flink/api/scala/ILoopCompat.scala | 29 +++
.../apache/flink/api/scala/ILoopCompat.scala | 31 +++
.../org.apache.flink/api/scala/FlinkILoop.scala | 218 ------------------
.../org.apache.flink/api/scala/FlinkShell.scala | 108 ---------
.../org/apache/flink/api/scala/FlinkILoop.scala | 224 +++++++++++++++++++
.../org/apache/flink/api/scala/FlinkShell.scala | 107 +++++++++
.../flink/api/scala/ScalaShellITSuite.scala | 2 +-
flink-staging/pom.xml | 18 +-
pom.xml | 2 +-
11 files changed, 407 insertions(+), 374 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 32059ea..f1745ed 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -125,34 +125,17 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala-shell</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
<!-- See main pom.xml for explanation of profiles -->
<profiles>
<profile>
- <id>scala-2.10</id>
- <activation>
-
- <property>
- <!-- this is the default scala profile -->
- <name>!scala-2.11</name>
- </property>
- </activation>
-
- <properties>
- <scala.version>2.10.4</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala-shell</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- </profile>
- <profile>
<id>include-yarn</id>
<activation>
<property>
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml
index 94718a3..5adb8c6 100644
--- a/flink-staging/flink-scala-shell/pom.xml
+++ b/flink-staging/flink-scala-shell/pom.xml
@@ -76,12 +76,6 @@ under the License.
<version>${scala.version}</version>
</dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>jline</artifactId>
- <version>2.10.4</version>
- </dependency>
-
<!-- tests -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -180,6 +174,7 @@ under the License.
<configuration>
<sources>
<source>src/main/scala</source>
+ <source>src/main/scala-${scala.binary.version}</source>
</sources>
</configuration>
</execution>
@@ -274,6 +269,12 @@ under the License.
<artifactId>quasiquotes_${scala.binary.version}</artifactId>
<version>${scala.macros.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>jline</artifactId>
+ <version>2.10.4</version>
+ </dependency>
</dependencies>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..797b420
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoop(in0, out0) {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
new file mode 100644
index 0000000..c1be6db
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.BufferedReader
+
+import _root_.scala.tools.nsc.interpreter._
+
+class ILoopCompat(
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoop(in0, out0) {
+
+ protected def addThunk(f: => Unit): Unit = f
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
deleted file mode 100644
index cd8a846..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.{BufferedReader, File, FileOutputStream}
-
-import scala.tools.nsc.interpreter._
-
-import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
-import org.apache.flink.util.AbstractID
-
-
-class FlinkILoop(
- val host: String,
- val port: Int,
- val externalJars: Option[Array[String]],
- in0: Option[BufferedReader],
- out0: JPrintWriter)
- extends ILoop(in0, out0) {
-
-
-
- def this(host:String,
- port:Int,
- externalJars : Option[Array[String]],
- in0: BufferedReader,
- out: JPrintWriter){
- this(host:String, port:Int, externalJars, Some(in0), out)
- }
-
- def this(host:String, port:Int, externalJars : Option[Array[String]]){
- this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
- }
-
- def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
- this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
- }
- // remote environment
- private val remoteEnv: ScalaShellRemoteEnvironment = {
- // allow creation of environments
- ScalaShellRemoteEnvironment.resetContextEnvironments()
-
- // create our environment that submits against the cluster (local or remote)
- val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
-
- // prevent further instantiation of environments
- ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
-
- remoteEnv
- }
-
- // local environment
- val scalaEnv: ExecutionEnvironment = {
- val scalaEnv = new ExecutionEnvironment(remoteEnv)
- scalaEnv
- }
-
- addThunk {
- intp.beQuietDuring {
- // automatically imports the flink scala api
- intp.addImports("org.apache.flink.api.scala._")
- intp.addImports("org.apache.flink.api.common.functions._")
- // with this we can access this object in the scala shell
- intp.bindValue("env", this.scalaEnv)
- }
- }
-
-
- /**
- * creates a temporary directory to store compiled console files
- */
- private val tmpDirBase: File = {
- // get unique temporary folder:
- val abstractID: String = new AbstractID().toString
- val tmpDir: File = new File(
- System.getProperty("java.io.tmpdir"),
- "scala_shell_tmp-" + abstractID)
- if (!tmpDir.exists) {
- tmpDir.mkdir
- }
- tmpDir
- }
-
- // scala_shell commands
- private val tmpDirShell: File = {
- new File(tmpDirBase, "scala_shell_commands")
- }
-
- // scala shell jar file name
- private val tmpJarShell: File = {
- new File(tmpDirBase, "scala_shell_commands.jar")
- }
-
-
- /**
- * Packages the compiled classes of the current shell session into a Jar file for execution
- * on a Flink cluster.
- *
- * @return The path of the created Jar file
- */
- def writeFilesToDisk(): File = {
- val vd = intp.virtualDirectory
-
- val vdIt = vd.iterator
-
- for (fi <- vdIt) {
- if (fi.isDirectory) {
-
- val fiIt = fi.iterator
-
- for (f <- fiIt) {
-
- // directory for compiled line
- val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
- lineDir.mkdirs()
-
- // compiled classes for commands from shell
- val writeFile = new File(lineDir.getAbsolutePath, f.name)
- val outputStream = new FileOutputStream(writeFile)
- val inputStream = f.input
-
- // copy file contents
- org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
-
- inputStream.close()
- outputStream.close()
- }
- }
- }
-
- val compiledClasses = new File(tmpDirShell.getAbsolutePath)
-
- val jarFilePath = new File(tmpJarShell.getAbsolutePath)
-
- val jh: JarHelper = new JarHelper
- jh.jarDir(compiledClasses, jarFilePath)
-
- jarFilePath
- }
-
- /**
- * CUSTOM START METHODS OVERRIDE:
- */
- override def prompt = "Scala-Flink> "
-
- /**
- * custom welcome message
- */
- override def printWelcome() {
- echo(
- // scalastyle:off
- """
- \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
- \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
- \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592
- \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588
- \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592
- \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588
- \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
- \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
- \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
- \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
- \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
- \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
- \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
- \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591
- \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
- \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592
- \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592
- \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588
- \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588
-\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593
-\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593
-\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593
-\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
- \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592
- \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593
- \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
- \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588
- \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593
- \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
- \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
- \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592
- \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591
-
- F L I N K - S C A L A - S H E L L
-
-NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
- * env.readTextFile("/path/to/data")
- * env.execute("Program name")
-
-HINT: You can use print() on a DataSet to print the contents to this shell.
- """
- // scalastyle:on
- )
-
- }
-
- def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
deleted file mode 100644
index a4fae91..0000000
--- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-
-import scala.tools.nsc.Settings
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-
-
-object FlinkShell {
-
- def main(args: Array[String]) {
-
- // scopt, command line arguments
- case class Config(
- port: Int = -1,
- host: String = "none",
- externalJars: Option[Array[String]] = None)
- val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
- head ("Flink Scala Shell")
-
- opt[Int] ('p', "port") action {
- (x, c) =>
- c.copy (port = x)
- } text("port specifies port of running JobManager")
-
- opt[(String)] ('h',"host") action {
- case (x, c) =>
- c.copy (host = x)
- } text("host specifies host name of running JobManager")
-
- opt[(String)] ('a',"addclasspath") action {
- case (x,c) =>
- val xArray = x.split(":")
- c.copy(externalJars = Option(xArray))
- } text("specifies additional jars to be used in Flink")
-
- help("help") text("prints this usage text")
- }
-
-
- // parse arguments
- parser.parse (args, Config () ) match {
- case Some(config) =>
- startShell(config.host,config.port,config.externalJars)
-
- case _ => println("Could not parse program arguments")
- }
- }
-
-
- def startShell(
- userHost : String,
- userPort : Int,
- externalJars : Option[Array[String]] = None): Unit ={
-
- println("Starting Flink Shell:")
-
- var cluster: LocalFlinkMiniCluster = null
-
- // either port or userhost not specified by user, create new minicluster
- val (host,port) = if (userHost == "none" || userPort == -1 ) {
- println("Creating new local server")
- cluster = new LocalFlinkMiniCluster(new Configuration, false)
- cluster.start()
- ("localhost",cluster.getLeaderRPCPort)
- } else {
- println(s"Connecting to remote server (host: $userHost, port: $userPort).")
- (userHost, userPort)
- }
-
- // custom shell
- val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
-
- repl.settings = new Settings()
-
- repl.settings.usejavacp.value = true
-
- // start scala interpreter shell
- repl.process(repl.settings)
-
- //repl.closeInterpreter()
-
- if (cluster != null) {
- cluster.stop()
- }
-
- println(" good bye ..")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
new file mode 100644
index 0000000..bcf9bc2
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{BufferedReader, File, FileOutputStream}
+
+import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
+import org.apache.flink.util.AbstractID
+
+import scala.tools.nsc.interpreter._
+
+
+class FlinkILoop(
+ val host: String,
+ val port: Int,
+ val externalJars: Option[Array[String]],
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoopCompat(in0, out0) {
+
+ def this(host:String,
+ port:Int,
+ externalJars : Option[Array[String]],
+ in0: BufferedReader,
+ out: JPrintWriter){
+ this(host:String, port:Int, externalJars, Some(in0), out)
+ }
+
+ def this(host:String, port:Int, externalJars : Option[Array[String]]){
+ this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
+ }
+
+ def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
+ this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
+ }
+
+ // remote environment
+ private val remoteEnv: ScalaShellRemoteEnvironment = {
+ // allow creation of environments
+ ScalaShellRemoteEnvironment.resetContextEnvironments()
+
+ // create our environment that submits against the cluster (local or remote)
+ val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+
+ // prevent further instantiation of environments
+ ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
+
+ remoteEnv
+ }
+
+ // local environment
+ val scalaEnv: ExecutionEnvironment = {
+ val scalaEnv = new ExecutionEnvironment(remoteEnv)
+ scalaEnv
+ }
+
+ /**
+ * creates a temporary directory to store compiled console files
+ */
+ private val tmpDirBase: File = {
+ // get unique temporary folder:
+ val abstractID: String = new AbstractID().toString
+ val tmpDir: File = new File(
+ System.getProperty("java.io.tmpdir"),
+ "scala_shell_tmp-" + abstractID)
+ if (!tmpDir.exists) {
+ tmpDir.mkdir
+ }
+ tmpDir
+ }
+
+ // scala_shell commands
+ private val tmpDirShell: File = {
+ new File(tmpDirBase, "scala_shell_commands")
+ }
+
+ // scala shell jar file name
+ private val tmpJarShell: File = {
+ new File(tmpDirBase, "scala_shell_commands.jar")
+ }
+
+ private val packageImports = Seq[String](
+ "org.apache.flink.api.scala._",
+ "org.apache.flink.api.common.functions._"
+ )
+
+ override def createInterpreter(): Unit = {
+ super.createInterpreter()
+
+ addThunk {
+ intp.beQuietDuring {
+ // import dependencies
+ intp.interpret("import " + packageImports.mkString(", "))
+
+ // set execution environment
+ intp.bind("env", this.scalaEnv)
+ }
+ }
+ }
+
+ /**
+ * Packages the compiled classes of the current shell session into a Jar file for execution
+ * on a Flink cluster.
+ *
+ * @return The path of the created Jar file
+ */
+ def writeFilesToDisk(): File = {
+ val vd = intp.virtualDirectory
+
+ val vdIt = vd.iterator
+
+ for (fi <- vdIt) {
+ if (fi.isDirectory) {
+
+ val fiIt = fi.iterator
+
+ for (f <- fiIt) {
+
+ // directory for compiled line
+ val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+ lineDir.mkdirs()
+
+ // compiled classes for commands from shell
+ val writeFile = new File(lineDir.getAbsolutePath, f.name)
+ val outputStream = new FileOutputStream(writeFile)
+ val inputStream = f.input
+
+ // copy file contents
+ org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+ inputStream.close()
+ outputStream.close()
+ }
+ }
+ }
+
+ val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+
+ val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+ val jh: JarHelper = new JarHelper
+ jh.jarDir(compiledClasses, jarFilePath)
+
+ jarFilePath
+ }
+
+ /**
+ * CUSTOM START METHODS OVERRIDE:
+ */
+ override def prompt = "Scala-Flink> "
+
+ /**
+ * custom welcome message
+ */
+ override def printWelcome() {
+ echo(
+ // scalastyle:off
+ """
+ \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
+ \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
+ \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592
+ \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588
+ \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592
+ \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588
+ \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
+ \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
+ \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
+ \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591
+ \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
+ \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
+ \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592
+ \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591
+ \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
+ \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592
+ \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592
+ \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588
+ \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588
+\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593
+\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593
+\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593
+\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
+ \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592
+ \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593
+ \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
+ \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588
+ \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593
+ \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
+ \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
+ \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592
+ \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591
+
+ F L I N K - S C A L A - S H E L L
+
+NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
+ * env.readTextFile("/path/to/data")
+ * env.execute("Program name")
+
+HINT: You can use print() on a DataSet to print the contents to this shell.
+ """
+ // scalastyle:on
+ )
+
+ }
+
+ def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
new file mode 100644
index 0000000..224983b
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.Settings
+
+
+object FlinkShell {
+
+ def main(args: Array[String]) {
+
+ // scopt, command line arguments
+ case class Config(
+ port: Int = -1,
+ host: String = "none",
+ externalJars: Option[Array[String]] = None)
+ val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+ head ("Flink Scala Shell")
+
+ opt[Int] ('p', "port") action {
+ (x, c) =>
+ c.copy (port = x)
+ } text("port specifies port of running JobManager")
+
+ opt[(String)] ('h',"host") action {
+ case (x, c) =>
+ c.copy (host = x)
+ } text("host specifies host name of running JobManager")
+
+ opt[(String)] ('a',"addclasspath") action {
+ case (x,c) =>
+ val xArray = x.split(":")
+ c.copy(externalJars = Option(xArray))
+ } text("specifies additional jars to be used in Flink")
+
+ help("help") text("prints this usage text")
+ }
+
+
+ // parse arguments
+ parser.parse (args, Config () ) match {
+ case Some(config) =>
+ startShell(config.host,config.port,config.externalJars)
+
+ case _ => println("Could not parse program arguments")
+ }
+ }
+
+
+ def startShell(
+ userHost : String,
+ userPort : Int,
+ externalJars : Option[Array[String]] = None): Unit ={
+
+ println("Starting Flink Shell:")
+
+ var cluster: LocalFlinkMiniCluster = null
+
+ // either port or userhost not specified by user, create new minicluster
+ val (host,port) = if (userHost == "none" || userPort == -1 ) {
+ println("Creating new local server")
+ cluster = new LocalFlinkMiniCluster(new Configuration, false)
+ cluster.start()
+ ("localhost",cluster.getLeaderRPCPort)
+ } else {
+ println(s"Connecting to remote server (host: $userHost, port: $userPort).")
+ (userHost, userPort)
+ }
+
+ // custom shell
+ val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
+
+ repl.settings = new Settings()
+
+ repl.settings.usejavacp.value = true
+
+ // start scala interpreter shell
+ repl.process(repl.settings)
+
+ //repl.closeInterpreter()
+
+ if (cluster != null) {
+ cluster.stop()
+ }
+
+ println(" good bye ..")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
index 7648c50..0621351 100644
--- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -22,7 +22,7 @@ import java.io._
import java.util.concurrent.TimeUnit
import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment}
+import org.apache.flink.test.util.{TestEnvironment, ForkableFlinkMiniCluster, TestBaseUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 271d26c..67aec5a 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -46,6 +46,7 @@ under the License.
<module>flink-ml</module>
<module>flink-language-binding</module>
<module>flink-gelly-scala</module>
+ <module>flink-scala-shell</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->
@@ -71,22 +72,5 @@ under the License.
<module>flink-tez</module>
</modules>
</profile>
- <profile>
- <id>scala-2.10</id>
- <activation>
-
- <property>
- <!-- this is the default scala profile -->
- <name>!scala-2.11</name>
- </property>
- </activation>
- <properties>
- <scala.version>2.10.4</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
- </properties>
- <modules>
- <module>flink-scala-shell</module>
- </modules>
- </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a5ff01..7e90ad6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -370,7 +370,7 @@ under the License.
</property>
</activation>
<properties>
- <scala.version>2.11.4</scala.version>
+ <scala.version>2.11.7</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
</profile>