You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/05/28 15:49:00 UTC
[3/3] flink git commit: [FLINK-1907] Add Flink Scala Shell
[FLINK-1907] Add Flink Scala Shell
This can either be used to connect to a Flink cluster or in local mode
with a cluster in the same JVM.
This only works for Scala 2.10 for now because the Scala REPL was
modified in 2.11.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/717f8812
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/717f8812
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/717f8812
Branch: refs/heads/master
Commit: 717f8812d8aa4c8a813e4ff1c3d36127ec1d87b8
Parents: 7f5edeb
Author: fleischhauf <ni...@googlemail.com>
Authored: Tue May 12 21:54:20 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu May 28 15:48:17 2015 +0200
----------------------------------------------------------------------
docs/scala_shell_quickstart.md | 72 +++++
flink-dist/pom.xml | 23 ++
flink-dist/src/main/assemblies/bin.xml | 7 +
.../flink/api/java/RemoteEnvironment.java | 10 +
flink-staging/flink-scala-shell/pom.xml | 281 +++++++++++++++++++
.../org.apache.flink/api/java/JarHelper.java | 211 ++++++++++++++
.../api/java/ScalaShellRemoteEnvironment.java | 83 ++++++
.../org.apache.flink/api/scala/FlinkILoop.scala | 202 +++++++++++++
.../org.apache.flink/api/scala/FlinkShell.scala | 96 +++++++
.../src/test/resources/log4j-test.properties | 24 ++
.../src/test/resources/logback-test.xml | 29 ++
.../flink/api/scala/ScalaShellITSuite.scala | 208 ++++++++++++++
.../start-script/start-scala-shell.sh | 58 ++++
.../util/StreamingMultipleProgramsTestBase.java | 2 +-
flink-staging/pom.xml | 17 ++
.../flink/test/util/AbstractTestBase.java | 6 +-
.../test/util/MultipleProgramsTestBase.java | 2 +-
.../apache/flink/test/util/TestBaseUtils.java | 9 +-
.../apache/flink/test/util/TestEnvironment.java | 2 +-
.../apache/flink/test/util/FlinkTestBase.scala | 2 +-
20 files changed, 1333 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/docs/scala_shell_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/scala_shell_quickstart.md b/docs/scala_shell_quickstart.md
new file mode 100644
index 0000000..4ad7f5b
--- /dev/null
+++ b/docs/scala_shell_quickstart.md
@@ -0,0 +1,72 @@
+---
+title: "Quickstart: Scala Shell"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+Start working on your Flink Scala program in a few simple steps.
+
+## Startup Flink interactive Scala shell
+
+Flink has an integrated interactive Scala shell.
+It can be used in a local setup as well as in a cluster setup.
+
+To use it in a local setup just execute:
+
+__Sample Input__:
+~~~bash
+flink/bin/start-scala-shell.sh
+~~~
+
+And it will initialize a local JobManager by itself.
+
+To use it in a cluster setup you can supply the host and port of the JobManager with:
+
+__Sample Input__:
+~~~bash
+flink/bin/start-scala-shell.sh -host "<hostname>" -port <portnumber>
+~~~
+
+
+## Usage
+
+The shell will prebind the ExecutionEnvironment as "env", so far only batch mode is supported.
+
+The following example will execute the wordcount program in the Scala shell:
+
+~~~scala
+Flink-Shell> val text = env.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,")
+Flink-Shell> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
+Flink-Shell> counts.print()
+~~~
+
+
+The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.
+
+It is possbile to write results to a file, like in the standard Scala api. However, in this case you need to call, to run your program:
+
+~~~scala
+Flink-Shell> env.execute("MyProgram")
+~~~
+
+The Flink Shell comes with command history and autocompletion.
+
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index de9ed8a..ae9e8c9 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -123,6 +123,29 @@ under the License.
<!-- See main pom.xml for explanation of profiles -->
<profiles>
<profile>
+ <id>scala-2.10</id>
+ <activation>
+
+ <property>
+ <!-- this is the default scala profile -->
+ <name>!scala-2.11</name>
+ </property>
+ </activation>
+
+ <properties>
+ <scala.version>2.10.4</scala.version>
+ <scala.binary.version>2.10</scala.binary.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala-shell</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>include-yarn</id>
<activation>
<property>
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 6a429ee..802799c 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -50,6 +50,12 @@ under the License.
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
+ <!-- flink scala shell-->
+ <fileSet>
+ <directory>../flink-staging/flink-scala-shell/start-script/</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>755</fileMode>
+ </fileSet>
<fileSet>
<!-- copy yarn start scripts -->
<directory>src/main/flink-bin/yarn-bin</directory>
@@ -114,6 +120,7 @@ under the License.
</excludes>
</fileSet>
+
<fileSet>
<!-- copy jar files of java examples -->
<directory>../flink-examples/flink-java-examples/target</directory>
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 515037c..a2f2891 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -87,4 +87,14 @@ public class RemoteEnvironment extends ExecutionEnvironment {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
}
+
+
+ // needed to call execute on ScalaShellRemoteEnvironment
+ public int getPort() {
+ return this.port;
+ }
+
+ public String getHost() {
+ return this.host;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml
new file mode 100644
index 0000000..7a5b485
--- /dev/null
+++ b/flink-staging/flink-scala-shell/pom.xml
@@ -0,0 +1,281 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-staging</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-scala-shell</artifactId>
+ <name>flink-scala-shell</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- scala command line parsing -->
+ <dependency>
+ <groupId>com.github.scopt</groupId>
+ <artifactId>scopt_${scala.binary.version}</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>jline</artifactId>
+ <version>2.10.4</version>
+ </dependency>
+
+ <!-- tests -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ <compilerPlugins combine.children="append">
+ <compilerPlugin>
+ <groupId>org.scalamacros</groupId>
+ <artifactId>paradise_${scala.version}</artifactId>
+ <version>${scala.macros.version}</version>
+ </compilerPlugin>
+ </compilerPlugins>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>1.0</version>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <stdout>W</stdout> <!-- Skip coloring output -->
+ </configuration>
+ <executions>
+ <execution>
+ <id>scala-test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <suffixes>(?<!(IT|Integration))(Test|Suite|Case)</suffixes>
+ <argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-test</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <suffixes>(IT|Integration)(Test|Suite|Case)</suffixes>
+ <argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+ <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>scala-2.10</id>
+ <activation>
+ <property>
+ <!-- this is the default scala profile -->
+ <name>!scala-2.11</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.scalamacros</groupId>
+ <artifactId>quasiquotes_${scala.binary.version}</artifactId>
+ <version>${scala.macros.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
new file mode 100644
index 0000000..5def4b0
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+import java.util.jar.JarOutputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+/**
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan <pc...@bea.com>
+ */
+public class JarHelper
+{
+ // ========================================================================
+ // Constants
+
+ private static final int BUFFER_SIZE = 2156;
+
+ // ========================================================================
+ // Variables
+
+ private byte[] mBuffer = new byte[BUFFER_SIZE];
+ private int mByteCount = 0;
+ private boolean mVerbose = false;
+ private String mDestJarName = "";
+
+ // ========================================================================
+ // Constructor
+
+ /**
+ * Instantiates a new JarHelper.
+ */
+ public JarHelper() {}
+
+ // ========================================================================
+ // Public methods
+
+ /**
+ * Jars a given directory or single file into a JarOutputStream.
+ */
+ public void jarDir(File dirOrFile2Jar, File destJar)
+ throws IOException {
+
+ if (dirOrFile2Jar == null || destJar == null)
+ {
+ throw new IllegalArgumentException();
+ }
+
+ mDestJarName = destJar.getCanonicalPath();
+ FileOutputStream fout = new FileOutputStream(destJar);
+ JarOutputStream jout = new JarOutputStream(fout);
+ //jout.setLevel(0);
+ try {
+ jarDir(dirOrFile2Jar, jout, null);
+ } catch(IOException ioe) {
+ throw ioe;
+ } finally {
+ jout.close();
+ fout.close();
+ }
+ }
+
+ /**
+ * Unjars a given jar file into a given directory.
+ */
+ public void unjarDir(File jarFile, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ FileInputStream fis = new FileInputStream(jarFile);
+ unjar(fis, destDir);
+ }
+
+ /**
+ * Given an InputStream on a jar file, unjars the contents into the given
+ * directory.
+ */
+ public void unjar(InputStream in, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ JarInputStream jis = new JarInputStream(in);
+ JarEntry entry;
+ while ((entry = jis.getNextJarEntry()) != null) {
+ if (entry.isDirectory()) {
+ File dir = new File(destDir,entry.getName());
+ dir.mkdir();
+ if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());}
+ continue;
+ }
+ int count;
+ byte[] data = new byte[ BUFFER_SIZE ];
+ File destFile = new File(destDir, entry.getName());
+ if (mVerbose) {
+ System.out.println("unjarring " + destFile +
+ " from " + entry.getName());
+ }
+ FileOutputStream fos = new FileOutputStream(destFile);
+ dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+ while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+ dest.write(data, 0, count);
+ }
+ dest.flush();
+ dest.close();
+ if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());}
+ }
+ jis.close();
+ }
+
+ public void setVerbose(boolean b) {
+ mVerbose = b;
+ }
+
+ // ========================================================================
+ // Private methods
+
+ private static final char SEP = '/';
+ /**
+ * Recursively jars up the given path under the given directory.
+ */
+ private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
+ throws IOException {
+ if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
+ if (dirOrFile2jar.isDirectory()) {
+ String[] dirList = dirOrFile2jar.list();
+ String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP);
+ if (path != null) {
+ JarEntry je = new JarEntry(subPath);
+ je.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(je);
+ jos.flush();
+ jos.closeEntry();
+ }
+ for (int i = 0; i < dirList.length; i++) {
+ File f = new File(dirOrFile2jar, dirList[i]);
+ jarDir(f,jos,subPath);
+ }
+ } else {
+ if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
+ {
+ if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());}
+ return;
+ }
+
+ if (mVerbose) {
+ System.out.println("adding " + dirOrFile2jar.getPath());
+ }
+ FileInputStream fis = new FileInputStream(dirOrFile2jar);
+ try {
+ JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
+ entry.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(entry);
+ while ((mByteCount = fis.read(mBuffer)) != -1) {
+ jos.write(mBuffer, 0, mByteCount);
+ if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");}
+ }
+ jos.flush();
+ jos.closeEntry();
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ fis.close();
+ }
+ }
+ }
+
+ // for debugging
+ public static void main(String[] args)
+ throws IOException
+ {
+ if (args.length < 2)
+ {
+ System.err.println("Usage: JarHelper jarname.jar directory");
+ return;
+ }
+
+ JarHelper jarHelper = new JarHelper();
+ jarHelper.mVerbose = true;
+
+ File destJar = new File(args[0]);
+ File dirOrFile2Jar = new File(args[1]);
+
+ jarHelper.jarDir(dirOrFile2Jar, destJar);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
new file mode 100644
index 0000000..cb470c9
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
@@ -0,0 +1,83 @@
+
+package org.apache.flink.api.java;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+
+import org.apache.flink.api.scala.FlinkILoop;
+
+import java.io.File;
+
+/**
+ * ScalaShellRemoteEnvironment references the JobManager through host and port parameters,
+ * and the Scala Shell (FlinkILoop).
+ * Upon calling execute(), it reads compiled lines of the Scala shell, aggregates them to a Jar
+ * and sends aggregated jar to the JobManager.
+ */
+
+public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
+
+ // reference to Scala Shell, for access to virtual directory
+ private FlinkILoop flinkILoop;
+
+ /**
+ * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop
+ *
+ * @param host The host name or address of the master (JobManager), where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
+ */
+ public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
+ super(host, port, jarFiles);
+ this.flinkILoop = flinkILoop;
+ }
+
+ /**
+ * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment
+ *
+ * @param jobName name of the job as string
+ * @return Result of the computation
+ * @throws Exception
+ */
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ Plan p = createProgramPlan(jobName);
+
+ // write virtual files to disk first
+ JarHelper jh = new JarHelper();
+
+ flinkILoop.writeFilesToDisk();
+
+ // jarr up.
+ File inFile = new File( flinkILoop.getTmpDirShell().getAbsolutePath());
+ File outFile = new File( flinkILoop.getTmpJarShell().getAbsolutePath());
+
+ jh.jarDir(inFile, outFile);
+
+ String[] jarFiles = {outFile.getAbsolutePath()};
+
+ // call "traditional" execution methods
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(super.getHost(), super.getPort(), jarFiles);
+ executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
+ return executor.executePlan(p);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
new file mode 100644
index 0000000..0de4953
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io.{BufferedReader, File, FileOutputStream}
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter._
+
+import org.apache.flink.api.java.ScalaShellRemoteEnvironment
+import org.apache.flink.util.AbstractID
+
+
+class FlinkILoop(val host: String,
+ val port: Int,
+ in0: Option[BufferedReader],
+ out0: JPrintWriter)
+ extends ILoop(in0, out0) {
+
+ def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){
+ this(host:String, port:Int, Some(in0), out)
+ }
+
+ def this(host:String, port:Int){
+ this(host:String,port: Int,None, new JPrintWriter(Console.out, true))
+ }
+ // remote environment
+ private val remoteEnv: ScalaShellRemoteEnvironment = {
+ val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
+ remoteEnv
+ }
+
+ // local environment
+ val scalaEnv: ExecutionEnvironment = {
+ val scalaEnv = new ExecutionEnvironment(remoteEnv)
+ scalaEnv
+ }
+
+
+ /**
+ * CUSTOM START METHODS OVERRIDE:
+ */
+
+ addThunk {
+ intp.beQuietDuring {
+ // automatically imports the flink scala api
+ intp.addImports("org.apache.flink.api.scala._")
+ intp.addImports("org.apache.flink.api.common.functions._")
+ // with this we can access this object in the scala shell
+ intp.bindValue("env", this.scalaEnv)
+ }
+ }
+
+
+
+ /**
+ * creates a temporary directory to store compiled console files
+ */
+ private val tmpDirBase: File = {
+ // get unique temporary folder:
+ val abstractID: String = new AbstractID().toString
+ val tmpDir: File = new File(
+ System.getProperty("java.io.tmpdir"),
+ "scala_shell_tmp-" + abstractID)
+ if (!tmpDir.exists) {
+ tmpDir.mkdir
+ }
+ tmpDir
+ }
+
+ // scala_shell commands
+ private val tmpDirShell: File = {
+ new File(tmpDirBase, "scala_shell_commands")
+ }
+
+ // scala shell jar file name
+ private val tmpJarShell: File = {
+ new File(tmpDirBase, "scala_shell_commands.jar")
+ }
+
+
+ /**
+ * writes contents of the compiled lines that have been executed in the shell into a
+ * "physical directory": creates a unique temporary directory
+ */
+ def writeFilesToDisk(): Unit = {
+ val vd = intp.virtualDirectory
+
+ var vdIt = vd.iterator
+
+ for (fi <- vdIt) {
+ if (fi.isDirectory) {
+
+ var fiIt = fi.iterator
+
+ for (f <- fiIt) {
+
+ // directory for compiled line
+ val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
+ lineDir.mkdirs()
+
+ // compiled classes for commands from shell
+ val writeFile = new File(lineDir.getAbsolutePath, f.name)
+ val outputStream = new FileOutputStream(writeFile)
+ val inputStream = f.input
+
+ // copy file contents
+ org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
+
+ inputStream.close()
+ outputStream.close()
+ }
+ }
+ }
+ }
+
+ /**
+ * CUSTOM START METHODS OVERRIDE:
+ */
+ override def prompt = "Scala-Flink> "
+
+ /**
+ * custom welcome message
+ */
+ override def printWelcome() {
+ echo(
+ """
+ ▒▓██▓██▒
+ ▓████▒▒█▓▒▓███▓▒
+ ▓███▓░░ ▒▒▒▓██▒ ▒
+ ░██▒ ▒▒▓▓█▓▓▒░ ▒████
+ ██▒ ░▒▓███▒ ▒█▒█▒
+ ░▓█ ███ ▓░▒██
+ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
+ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
+ ████░ ▒▓█▓ ██▒▒▒ ▓███▒
+ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
+ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
+ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
+ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
+ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
+ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
+ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
+ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
+ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
+ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
+▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
+█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
+██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
+▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
+ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
+ ▓█ ▒█▓ ░ █░ ▒█ █▓
+ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
+ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
+ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
+ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
+ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
+ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
+ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
+
+ F L I N K - S C A L A - S H E L L
+
+NOTE: Use the prebound Execution Environment "env" to read data and execute your program:
+ * env.readTextFile("/path/to/data")
+ * env.execute("Program name")
+
+HINT: You can use print() on a DataSet to print the contents to this shell.
+ """)
+ }
+
+ // getter functions:
+ // get (root temporary folder)
+ def getTmpDirBase(): File = {
+ return (this.tmpDirBase);
+ }
+
+ // get shell folder name inside tmp dir
+ def getTmpDirShell(): File = {
+ return (this.tmpDirShell)
+ }
+
+ // get tmp jar file name
+ def getTmpJarShell(): File = {
+ return (this.tmpJarShell)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
new file mode 100644
index 0000000..90615ec
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+
+
+
+object FlinkShell {
+
+ def main(args: Array[String]) {
+
+ // scopt, command line arguments
+ case class Config(port: Int = -1,
+ host: String = "none")
+ val parser = new scopt.OptionParser[Config] ("scopt") {
+ head ("scopt", "3.x")
+ opt[Int] ('p', "port") action {
+ (x, c) =>
+ c.copy (port = x)
+ } text ("port specifies port of running JobManager")
+ opt[(String)] ('h',"host") action {
+ case (x, c) =>
+ c.copy (host = x)
+ } text ("host specifies host name of running JobManager")
+ help("help") text("prints this usage text")
+
+ }
+
+
+ // parse arguments
+ parser.parse (args, Config () ) map {
+ config =>
+ startShell(config.host,config.port);
+ } getOrElse {
+ // arguments are bad, usage message will have been displayed
+ println("Could not parse program arguments")
+ }
+ }
+
+
+ def startShell(userHost : String, userPort : Int): Unit ={
+ println("Starting Flink Shell:")
+
+ var cluster: LocalFlinkMiniCluster = null
+
+ // either port or userhost not specified by user, create new minicluster
+ val (host,port) = if (userHost == "none" || userPort == -1 )
+ {
+ println("Creating new local server")
+ cluster = new LocalFlinkMiniCluster(new Configuration, false)
+ ("localhost",cluster.getJobManagerRPCPort)
+ } else {
+ println(s"Connecting to remote server (host: $userHost, port: $userPort).")
+ (userHost, userPort)
+ }
+
+ // custom shell
+ val repl = new FlinkILoop(host, port) //new MyILoop();
+
+ repl.settings = new Settings()
+
+ repl.settings.usejavacp.value = true
+
+ // start scala interpreter shell
+ repl.process(repl.settings)
+
+ //repl.closeInterpreter()
+
+ if (cluster != null) {
+ cluster.stop()
+ }
+
+ println(" good bye ..")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties b/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9912b19
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=OFF, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml b/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
new file mode 100644
index 0000000..412592f
--- /dev/null
+++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+import java.net.URLClassLoader
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, ForkableFlinkMiniCluster, FlinkTestBase}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.FiniteDuration
+import scala.tools.nsc.Settings
+
+class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
+
+ test("Iteration test with iterative Pi example") {
+
+ val input : String =
+ """
+ val initial = env.fromElements(0)
+
+ val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
+ val result = iterationInput.map { i =>
+ val x = Math.random()
+ val y = Math.random()
+ i + (if (x * x + y * y < 1) 1 else 0)
+ }
+ result
+ }
+ val result = count map { c => c / 10000.0 * 4 }
+ result.collect()
+ """.stripMargin
+
+ val output : String = processInShell(input)
+
+ output should not include "failed"
+ output should not include "error"
+ output should not include "Exception"
+
+ output should include("Job execution switched to status FINISHED.")
+ }
+
+ test("WordCount in Shell") {
+ val input = """
+ val text = env.fromElements("To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,")
+
+ val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
+ val result = counts.print()
+ """.stripMargin
+
+ val output = processInShell(input)
+
+ output should not include "failed"
+ output should not include "error"
+ output should not include "Exception"
+
+ output should include("Job execution switched to status FINISHED.")
+
+// some of the words that should be included
+ output should include("(a,1)")
+ output should include("(whether,1)")
+ output should include("(to,4)")
+ output should include("(arrows,1)")
+ }
+
+ test("Sum 1..0, should be 55") {
+ val input : String =
+ """
+ val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
+ val reduced = input.reduce(_+_)
+ reduced.print
+ """.stripMargin
+
+ val output : String = processInShell(input)
+
+ output should not include "failed"
+ output should not include "error"
+ output should not include "Exception"
+
+ output should include("Job execution switched to status FINISHED.")
+
+ output should include("55")
+ }
+
+ test("WordCount in Shell with custom case class") {
+ val input : String =
+ """
+ case class WC(word: String, count: Int)
+
+ val wordCounts = env.fromElements(
+ new WC("hello", 1),
+ new WC("world", 2),
+ new WC("world", 8))
+
+ val reduced = wordCounts.groupBy(0).sum(1)
+
+ reduced.print()
+ """.stripMargin
+
+ val output : String = processInShell(input)
+
+ output should not include "failed"
+ output should not include "error"
+ output should not include "Exception"
+
+ output should include("Job execution switched to status FINISHED.")
+
+ output should include("WC(hello,1)")
+ output should include("WC(world,10)")
+ }
+
+
+ /**
+ * Run the input using a Scala Shell and return the output of the shell.
+ * @param input commands to be processed in the shell
+ * @return output of shell
+ */
+ def processInShell(input : String): String ={
+
+ val in = new BufferedReader(new StringReader(input + "\n"))
+ val out = new StringWriter()
+ val baos = new ByteArrayOutputStream()
+
+ val oldOut = System.out
+ System.setOut(new PrintStream(baos))
+
+ // new local cluster
+ val host = "localhost"
+ val port = cluster match {
+ case Some(c) => c.getJobManagerRPCPort
+
+ case _ => throw new RuntimeException("Test cluster not initialized.")
+ }
+
+ val cl = getClass.getClassLoader
+ var paths = new ArrayBuffer[String]
+ cl match {
+ case urlCl: URLClassLoader =>
+ for (url <- urlCl.getURLs) {
+ if (url.getProtocol == "file") {
+ paths += url.getFile
+ }
+ }
+ case _ =>
+ }
+
+ val classpath = paths.mkString(File.pathSeparator)
+
+ val repl = new FlinkILoop(host, port, in, new PrintWriter(out)) //new MyILoop();
+
+ repl.settings = new Settings()
+
+ // enable this line to use scala in intellij
+ repl.settings.usejavacp.value = true
+
+ repl.addedClasspath = classpath
+
+ repl.process(repl.settings)
+
+ repl.closeInterpreter()
+
+ System.setOut(oldOut)
+
+ val stdout = baos.toString
+
+ // reprint because ScalaTest fails if we don't
+ print(stdout)
+
+ out.toString + stdout
+ }
+
+ var cluster: Option[ForkableFlinkMiniCluster] = None
+ val parallelism = 4
+
+ override def beforeAll(): Unit = {
+ val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false, false)
+ val clusterEnvironment = new TestEnvironment(cl, parallelism)
+ clusterEnvironment.setAsContext()
+
+ cluster = Some(cl)
+ }
+
+ override def afterAll(): Unit = {
+ cluster.map(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
new file mode 100644
index 0000000..4bbd49a
--- /dev/null
+++ b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
@@ -0,0 +1,58 @@
+#!/bin/bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# from scala-lang 2.10.4
+
+# restore stty settings (echo in particular)
+function restoreSttySettings() {
+ if [[ -n $SCALA_RUNNER_DEBUG ]]; then
+ echo "restoring stty:"
+ echo "$saved_stty"
+ fi
+ stty $saved_stty
+ saved_stty=""
+}
+
+function onExit() {
+ [[ "$saved_stty" != "" ]] && restoreSttySettings
+ exit $scala_exit_status
+}
+
+
+# to reenable echo if we are interrupted before completing.
+trap onExit INT
+# save terminal settings
+saved_stty=$(stty -g 2>/dev/null)
+# clear on error so we don't later try to restore them
+if [[ ! $? ]]; then
+ saved_stty=""
+fi
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+FLINK_CLASSPATH=`constructFlinkClassPath`
+
+java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
+
+#restore echo
+onExit
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index 945ac07..a9ebd0b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -76,7 +76,7 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
@BeforeClass
public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false);
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false, true);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index bc7d101..f57f6a8 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -71,5 +71,22 @@ under the License.
<module>flink-tez</module>
</modules>
</profile>
+ <profile>
+ <id>scala-2.10</id>
+ <activation>
+
+ <property>
+ <!-- this is the default scala profile -->
+ <name>!scala-2.11</name>
+ </property>
+ </activation>
+ <properties>
+ <scala.version>2.10.4</scala.version>
+ <scala.binary.version>2.10</scala.binary.version>
+ </properties>
+ <modules>
+ <module>flink-scala-shell</module>
+ </modules>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index a5a7da9..a63f6ac 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -59,8 +59,8 @@ public abstract class AbstractTestBase extends TestBaseUtils {
// Local Test Cluster Life Cycle
// --------------------------------------------------------------------------------------------
- public void startCluster() throws Exception {
- this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false);
+ public void startCluster() throws Exception{
+ this.executor = startCluster(numTaskManagers, taskManagerNumSlots, StreamingMode.BATCH_ONLY, false, true);
}
public void stopCluster() throws Exception {
@@ -142,4 +142,4 @@ public abstract class AbstractTestBase extends TestBaseUtils {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index 8dab485..97d5db0 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -101,7 +101,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
@BeforeClass
public static void setup() throws Exception{
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer);
+ cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.BATCH_ONLY, startWebServer, true);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 29cb2a4..92d66d9 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -97,10 +97,11 @@ public class TestBaseUtils {
}
- protected static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
+ public static ForkableFlinkMiniCluster startCluster(int numTaskManagers,
int taskManagerNumSlots,
StreamingMode mode,
- boolean startWebserver) throws Exception {
+ boolean startWebserver,
+ boolean singleActorSystem) throws Exception {
logDir = File.createTempFile("TestBaseUtils-logdir", null);
Assert.assertTrue("Unable to delete temp file", logDir.delete());
@@ -121,10 +122,10 @@ public class TestBaseUtils {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
- return new ForkableFlinkMiniCluster(config, true, mode);
+ return new ForkableFlinkMiniCluster(config, singleActorSystem, mode);
}
- protected static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
+ public static void stopCluster(ForkableFlinkMiniCluster executor, FiniteDuration timeout) throws Exception {
if (logDir != null) {
FileUtils.deleteDirectory(logDir);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 25f2c83..cd10cc6 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -79,7 +79,7 @@ public class TestEnvironment extends ExecutionEnvironment {
return pc.compile(p);
}
- protected void setAsContext() {
+ public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
http://git-wip-us.apache.org/repos/asf/flink/blob/717f8812/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
index 3ea8624..bac1ede 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/FlinkTestBase.scala
@@ -54,7 +54,7 @@ trait FlinkTestBase extends BeforeAndAfter {
val parallelism = 4
before {
- val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false)
+ val cl = TestBaseUtils.startCluster(1, parallelism, StreamingMode.BATCH_ONLY, false, true)
val clusterEnvironment = new TestEnvironment(cl, parallelism)
clusterEnvironment.setAsContext()