You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/06/09 20:14:51 UTC
incubator-zeppelin git commit: ZEPPELIN-44 Interpreter for Apache
Flink
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master ec7a59c09 -> f0301cdfa
ZEPPELIN-44 Interpreter for Apache Flink
#### Interpreter for [Apache Flink](http://flink.apache.org/).
Flink people helped a lot to write the interpreter. Thanks so much! Some codes are copied from Flink's development branch. Once Flink releases 0.9, copied code and snapshot repository configuration will be removed.
#### Build
if there're no options, by default it is building against flink 0.9.0-milestone-1.
With combination of Zeppelin, it is good idea to use 0.9-SNAPSHOT, because of it support .collect() that helps really a lot to get results data and display it on Zeppelin.
So, you might want to build in this way,
```
mvn package -Dflink.version=0.9-SNAPSHOT -DskipTests
```
#### Screenshot
![image](https://cloud.githubusercontent.com/assets/1540981/7674014/32c0ee68-fd59-11e4-9ee6-5ec0bbf6a8f2.png)
Author: Lee moon soo <mo...@apache.org>
Closes #75 from Leemoonsoo/flink and squashes the following commits:
f08bd25 [Lee moon soo] Update pom.xml after https://github.com/apache/incubator-zeppelin/pull/88
460cf46 [Lee moon soo] jarr up -> jar up
501efb3 [Lee moon soo] Add scalastyle
e69e5ba [Lee moon soo] Add license
7be1f90 [Lee moon soo] Add apache snapshot repo
ebbd0da [Lee moon soo] Fix unittest and update comment
27fc306 [Lee moon soo] Cleaning up
f2a66df [Lee moon soo] Initial implementation of interpreter for Apache Flink
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f0301cdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f0301cdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f0301cdf
Branch: refs/heads/master
Commit: f0301cdfa4ba498f1a4725c7eee41823fad7fcd6
Parents: ec7a59c
Author: Lee moon soo <mo...@apache.org>
Authored: Mon Jun 8 00:44:38 2015 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Jun 9 11:14:42 2015 -0700
----------------------------------------------------------------------
_tools/scalastyle.xml | 146 +++++++
conf/zeppelin-site.xml.template | 2 +-
flink/pom.xml | 386 +++++++++++++++++++
.../apache/zeppelin/flink/FlinkEnvironment.java | 83 ++++
.../org/apache/zeppelin/flink/FlinkIMain.java | 92 +++++
.../apache/zeppelin/flink/FlinkInterpreter.java | 312 +++++++++++++++
.../org/apache/zeppelin/flink/JarHelper.java | 219 +++++++++++
.../zeppelin/flink/FlinkInterpreterTest.java | 66 ++++
pom.xml | 1 +
.../zeppelin/conf/ZeppelinConfiguration.java | 3 +-
10 files changed, 1308 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/_tools/scalastyle.xml
----------------------------------------------------------------------
diff --git a/_tools/scalastyle.xml b/_tools/scalastyle.xml
new file mode 100644
index 0000000..f7bb0d4
--- /dev/null
+++ b/_tools/scalastyle.xml
@@ -0,0 +1,146 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!-- NOTE: This was taken and adapted from Apache Spark. -->
+
+<!-- If you wish to turn off checking for a section of code, you can put a comment in the source
+ before and after the section, with the following syntax: -->
+<!-- // scalastyle:off -->
+<!-- ... -->
+<!-- // naughty stuff -->
+<!-- ... -->
+<!-- // scalastyle:on -->
+
+<scalastyle>
+ <name>Scalastyle standard configuration</name>
+ <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <!-- <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxFileLength"><![CDATA[800]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+ <parameters>
+ <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLineLength"><![CDATA[100]]></parameter>
+ <parameter name="tabSize"><![CDATA[2]]></parameter>
+ <parameter name="ignoreImports">true</parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+ <parameters>
+ <parameter name="maxParameters"><![CDATA[10]]></parameter>
+ </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="regex"><![CDATA[println]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxTypes"><![CDATA[30]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maximum"><![CDATA[10]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+ <parameters>
+ <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+ <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
+ </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxLength"><![CDATA[50]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> -->
+ <!-- <parameters> -->
+ <!-- <parameter name="maxMethods"><![CDATA[30]]></parameter> -->
+ <!-- </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> -->
+ <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+</scalastyle>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 9f773d5..e10c85e 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -66,7 +66,7 @@
<property>
<name>zeppelin.interpreters</name>
- <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter</value>
+ <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
new file mode 100644
index 0000000..68aa62d
--- /dev/null
+++ b/flink/pom.xml
@@ -0,0 +1,386 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-flink</artifactId>
+ <packaging>jar</packaging>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ <name>Zeppelin: Flink</name>
+ <description>Zeppelin flink support</description>
+ <url>http://zeppelin.incubator.apache.org</url>
+
+ <properties>
+ <flink.version>0.9.0-milestone-1</flink.version>
+ <flink.akka.version>2.3.7</flink.akka.version>
+ <flink.scala.binary.version>2.10</flink.scala.binary.version>
+ <flink.scala.version>2.10.4</flink.scala.version>
+ <scala.macros.version>2.0.1</scala.macros.version>
+ </properties>
+
+ <repositories>
+ <!-- for flink 0.9-SNAPSHOT. After 0.9 released, it can be removed -->
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>zeppelin-interpreter</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${flink.scala.binary.version}</artifactId>
+ <version>${flink.akka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote_${flink.scala.binary.version}</artifactId>
+ <version>${flink.akka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-slf4j_${flink.scala.binary.version}</artifactId>
+ <version>${flink.akka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_${flink.scala.binary.version}</artifactId>
+ <version>${flink.akka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${flink.scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${flink.scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${flink.scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/.idea/</exclude>
+ <exclude>**/*.iml</exclude>
+ <exclude>.gitignore</exclude>
+ <exclude>**/.settings/*</exclude>
+ <exclude>**/.classpath</exclude>
+ <exclude>**/.project</exclude>
+ <exclude>**/target/**</exclude>
+ <exclude>**/README.md</exclude>
+ <exclude>dependency-reduced-pom.xml</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ <compilerPlugins combine.children="append">
+ <compilerPlugin>
+ <groupId>org.scalamacros</groupId>
+ <artifactId>paradise_${flink.scala.version}</artifactId>
+ <version>${scala.macros.version}</version>
+ </compilerPlugin>
+ </compilerPlugins>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <!-- excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes -->
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../_tools/scalastyle.xml</configLocation>
+ <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.17</version>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>${project.version}</version>
+ <type>${project.packaging}</type>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
new file mode 100644
index 0000000..629932b
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.File;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class override execute() method to create an PlanExecutor with
+ * jar file that packages classes from scala compiler.
+ */
+public class FlinkEnvironment extends ExecutionEnvironment {
+ Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
+
+ private String host;
+ private int port;
+
+ private FlinkIMain imain;
+
+ public FlinkEnvironment(String host, int port, FlinkIMain imain) {
+ this.host = host;
+ this.port = port;
+ this.imain = imain;
+
+ logger.info("jobManager host={}, port={}", host, port);
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ JavaPlan plan = createProgramPlan(jobName);
+
+ File jarFile = imain.jar();
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
+ jarFile.getAbsolutePath());
+
+ JobExecutionResult result = executor.executePlan(plan);
+
+ if (jarFile.isFile()) {
+ jarFile.delete();
+ }
+
+ return result;
+ }
+
+ @Override
+ public String getExecutionPlan() throws Exception {
+ JavaPlan plan = createProgramPlan("unnamed", false);
+ plan.setDefaultParallelism(getParallelism());
+ registerCachedFilesWithPlan(plan);
+
+ File jarFile = imain.jar();
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
+ jarFile.getAbsolutePath());
+ String jsonPlan = executor.getOptimizerPlanAsJSON(plan);
+
+ if (jarFile != null && jarFile.isFile()) {
+ jarFile.delete();
+ }
+
+ return jsonPlan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
new file mode 100644
index 0000000..ee6516c
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.reflect.io.AbstractFile;
+import scala.reflect.io.VirtualDirectory;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.IMain;
+
+/**
+ * Scala compiler
+ */
+public class FlinkIMain extends IMain {
+ Logger logger = LoggerFactory.getLogger(FlinkIMain.class);
+
+ public FlinkIMain(Settings setting, PrintWriter out) {
+ super(setting, out);
+ }
+
+ public File jar() throws IOException {
+ VirtualDirectory classDir = virtualDirectory();
+ // create execution environment
+ File jarBuildDir = new File(System.getProperty("java.io.tmpdir")
+ + "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis());
+ jarBuildDir.mkdirs();
+
+ File jarFile = new File(System.getProperty("java.io.tmpdir")
+ + "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar");
+
+
+ Iterator<AbstractFile> vdIt = classDir.iterator();
+ while (vdIt.hasNext()) {
+ AbstractFile fi = vdIt.next();
+ if (fi.isDirectory()) {
+ Iterator<AbstractFile> fiIt = fi.iterator();
+ while (fiIt.hasNext()) {
+ AbstractFile f = fiIt.next();
+
+ // directory for compiled line
+ File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name());
+ lineDir.mkdirs();
+
+ // compiled classes for commands from shell
+ File writeFile = new File(lineDir.getAbsolutePath(), f.name());
+ FileOutputStream outputStream = new FileOutputStream(writeFile);
+ InputStream inputStream = f.input();
+
+ // copy file contents
+ org.apache.commons.io.IOUtils.copy(inputStream, outputStream);
+
+ inputStream.close();
+ outputStream.close();
+ }
+ }
+ }
+
+ // jar up
+ JarHelper jh = new JarHelper();
+ jh.jarDir(jarBuildDir, jarFile);
+
+ FileUtils.deleteDirectory(jarBuildDir);
+ return jarFile;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
new file mode 100644
index 0000000..b342f4e
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Console;
+import scala.None;
+import scala.Some;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+/**
+ * Interpreter for Apache Flink (http://flink.apache.org)
+ */
+public class FlinkInterpreter extends Interpreter {
+ Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
+ private Settings settings;
+ private ByteArrayOutputStream out;
+ private FlinkIMain imain;
+ private Map<String, Object> binder;
+ private ExecutionEnvironment env;
+ private Configuration flinkConf;
+ private LocalFlinkMiniCluster localFlinkCluster;
+
+ public FlinkInterpreter(Properties property) {
+ super(property);
+ }
+
+ static {
+ Interpreter.register(
+ "flink",
+ "flink",
+ FlinkInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("local", "true", "Run flink locally")
+ .add("jobmanager.rpc.address", "localhost", "Flink cluster")
+ .add("jobmanager.rpc.port", "6123", "Flink cluster")
+ .build()
+ );
+ }
+
+ @Override
+ public void open() {
+ URL[] urls = getClassloaderUrls();
+ this.settings = new Settings();
+
+ // set classpath
+ PathSetting pathSettings = settings.classpath();
+ String classpath = "";
+ List<File> paths = currentClassPath();
+ for (File f : paths) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += f.getAbsolutePath();
+ }
+
+ if (urls != null) {
+ for (URL u : urls) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += u.getFile();
+ }
+ }
+
+ pathSettings.v_$eq(classpath);
+ settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+ settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
+ .getContextClassLoader()));
+ BooleanSetting b = (BooleanSetting) settings.usejavacp();
+ b.v_$eq(true);
+ settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+ out = new ByteArrayOutputStream();
+ imain = new FlinkIMain(settings, new PrintWriter(out));
+
+ initializeFlinkEnv();
+ }
+
+ private boolean localMode() {
+ return Boolean.parseBoolean(getProperty("local"));
+ }
+
+ private String getRpcAddress() {
+ if (localMode()) {
+ return "localhost";
+ } else {
+ return getProperty("jobmanager.rpc.address");
+ }
+ }
+
+ private int getRpcPort() {
+ if (localMode()) {
+ return localFlinkCluster.getJobManagerRPCPort();
+ } else {
+ return Integer.parseInt(getProperty("jobmanager.rpc.port"));
+ }
+ }
+
+ private void initializeFlinkEnv() {
+ // prepare bindings
+ imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
+ binder = (Map<String, Object>) getValue("_binder");
+
+ flinkConf = new org.apache.flink.configuration.Configuration();
+ Properties intpProperty = getProperty();
+ for (Object k : intpProperty.keySet()) {
+ String key = (String) k;
+ String val = toString(intpProperty.get(key));
+ flinkConf.setString(key, val);
+ }
+
+ if (localMode()) {
+ startFlinkMiniCluster();
+ }
+
+ env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain);
+ binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env));
+
+ // do import and create val
+ imain.interpret("@transient val env = "
+ + "_binder.get(\"env\")"
+ + ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]");
+
+ imain.interpret("import org.apache.flink.api.scala._");
+ }
+
+
+ private List<File> currentClassPath() {
+ List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
+ String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+ if (cps != null) {
+ for (String cp : cps) {
+ paths.add(new File(cp));
+ }
+ }
+ return paths;
+ }
+
+ private List<File> classPath(ClassLoader cl) {
+ List<File> paths = new LinkedList<File>();
+ if (cl == null) {
+ return paths;
+ }
+
+ if (cl instanceof URLClassLoader) {
+ URLClassLoader ucl = (URLClassLoader) cl;
+ URL[] urls = ucl.getURLs();
+ if (urls != null) {
+ for (URL url : urls) {
+ paths.add(new File(url.getFile()));
+ }
+ }
+ }
+ return paths;
+ }
+
+ public Object getValue(String name) {
+ Object ret = imain.valueOfTerm(name);
+ if (ret instanceof None) {
+ return null;
+ } else if (ret instanceof Some) {
+ return ((Some) ret).get();
+ } else {
+ return ret;
+ }
+ }
+
+ @Override
+ public void close() {
+ imain.close();
+
+ if (localMode()) {
+ stopFlinkMiniCluster();
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext context) {
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(Code.SUCCESS);
+ }
+
+ InterpreterResult result = interpret(line.split("\n"), context);
+ return result;
+ }
+
+ public InterpreterResult interpret(String[] lines, InterpreterContext context) {
+ String[] linesToRun = new String[lines.length + 1];
+ for (int i = 0; i < lines.length; i++) {
+ linesToRun[i] = lines[i];
+ }
+ linesToRun[lines.length] = "print(\"\")";
+
+ Console.setOut(out);
+ out.reset();
+ Code r = null;
+
+ String incomplete = "";
+ for (String s : linesToRun) {
+ scala.tools.nsc.interpreter.Results.Result res = null;
+ try {
+ res = imain.interpret(incomplete + s);
+ } catch (Exception e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
+ }
+
+ r = getResultCode(res);
+
+ if (r == Code.ERROR) {
+ return new InterpreterResult(r, out.toString());
+ } else if (r == Code.INCOMPLETE) {
+ incomplete += s + "\n";
+ } else {
+ incomplete = "";
+ }
+ }
+
+ if (r == Code.INCOMPLETE) {
+ return new InterpreterResult(r, "Incomplete expression");
+ } else {
+ return new InterpreterResult(r, out.toString());
+ }
+ }
+
+ private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+ if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+ return Code.SUCCESS;
+ } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+ return Code.INCOMPLETE;
+ } else {
+ return Code.ERROR;
+ }
+ }
+
+
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return new LinkedList<String>();
+ }
+
+ private void startFlinkMiniCluster() {
+ localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false);
+ localFlinkCluster.waitForTaskManagersToBeRegistered();
+ }
+
+ private void stopFlinkMiniCluster() {
+ if (localFlinkCluster != null) {
+ localFlinkCluster.shutdown();
+ localFlinkCluster = null;
+ }
+ }
+
+ static final String toString(Object o) {
+ return (o instanceof String) ? (String) o : "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
new file mode 100644
index 0000000..efc4951
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+
+/**
+ * This class copied from flink-scala-shell. Once the flink-0.9 is published in
+ * the maven repository, this class can be removed
+ *
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from
+ * http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans
+ * /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan <pc...@bea.com>
+ */
+public class JarHelper {
+ // ========================================================================
+ // Constants
+
+ private static final int BUFFER_SIZE = 2156;
+
+ // ========================================================================
+ // Variables
+
+ private byte[] mBuffer = new byte[BUFFER_SIZE];
+ private int mByteCount = 0;
+ private boolean mVerbose = false;
+ private String mDestJarName = "";
+
+ // ========================================================================
+ // Constructor
+
+ /**
+ * Instantiates a new JarHelper.
+ */
+ public JarHelper() {
+ }
+
+ // ========================================================================
+ // Public methods
+
+ /**
+ * Jars a given directory or single file into a JarOutputStream.
+ */
+ public void jarDir(File dirOrFile2Jar, File destJar) throws IOException {
+
+ if (dirOrFile2Jar == null || destJar == null) {
+ throw new IllegalArgumentException();
+ }
+
+ mDestJarName = destJar.getCanonicalPath();
+ FileOutputStream fout = new FileOutputStream(destJar);
+ JarOutputStream jout = new JarOutputStream(fout);
+ // jout.setLevel(0);
+ try {
+ jarDir(dirOrFile2Jar, jout, null);
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ jout.close();
+ fout.close();
+ }
+ }
+
+ /**
+ * Unjars a given jar file into a given directory.
+ */
+ public void unjarDir(File jarFile, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ FileInputStream fis = new FileInputStream(jarFile);
+ unjar(fis, destDir);
+ }
+
+ /**
+ * Given an InputStream on a jar file, unjars the contents into the given
+ * directory.
+ */
+ public void unjar(InputStream in, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ JarInputStream jis = new JarInputStream(in);
+ JarEntry entry;
+ while ((entry = jis.getNextJarEntry()) != null) {
+ if (entry.isDirectory()) {
+ File dir = new File(destDir, entry.getName());
+ dir.mkdir();
+ if (entry.getTime() != -1) {
+ dir.setLastModified(entry.getTime());
+ }
+ continue;
+ }
+ int count;
+ byte[] data = new byte[BUFFER_SIZE];
+ File destFile = new File(destDir, entry.getName());
+ if (mVerbose) {
+ System.out
+ .println("unjarring " + destFile + " from " + entry.getName());
+ }
+ FileOutputStream fos = new FileOutputStream(destFile);
+ dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+ while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+ dest.write(data, 0, count);
+ }
+ dest.flush();
+ dest.close();
+ if (entry.getTime() != -1) {
+ destFile.setLastModified(entry.getTime());
+ }
+ }
+ jis.close();
+ }
+
+ public void setVerbose(boolean b) {
+ mVerbose = b;
+ }
+
+ // ========================================================================
+ // Private methods
+
+ private static final char SEP = '/';
+
+ /**
+ * Recursively jars up the given path under the given directory.
+ */
+ private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
+ throws IOException {
+ if (mVerbose) {
+ System.out.println("checking " + dirOrFile2jar);
+ }
+ if (dirOrFile2jar.isDirectory()) {
+ String[] dirList = dirOrFile2jar.list();
+ String subPath = (path == null) ? ""
+ : (path + dirOrFile2jar.getName() + SEP);
+ if (path != null) {
+ JarEntry je = new JarEntry(subPath);
+ je.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(je);
+ jos.flush();
+ jos.closeEntry();
+ }
+ for (int i = 0; i < dirList.length; i++) {
+ File f = new File(dirOrFile2jar, dirList[i]);
+ jarDir(f, jos, subPath);
+ }
+ } else {
+ if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) {
+ if (mVerbose) {
+ System.out.println("skipping " + dirOrFile2jar.getPath());
+ }
+ return;
+ }
+
+ if (mVerbose) {
+ System.out.println("adding " + dirOrFile2jar.getPath());
+ }
+ FileInputStream fis = new FileInputStream(dirOrFile2jar);
+ try {
+ JarEntry entry = new JarEntry(path + dirOrFile2jar.getName());
+ entry.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(entry);
+ while ((mByteCount = fis.read(mBuffer)) != -1) {
+ jos.write(mBuffer, 0, mByteCount);
+ if (mVerbose) {
+ System.out.println("wrote " + mByteCount + " bytes");
+ }
+ }
+ jos.flush();
+ jos.closeEntry();
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ fis.close();
+ }
+ }
+ }
+
+ // for debugging
+ public static void main(String[] args) throws IOException {
+ if (args.length < 2) {
+ System.err.println("Usage: JarHelper jarname.jar directory");
+ return;
+ }
+
+ JarHelper jarHelper = new JarHelper();
+ jarHelper.mVerbose = true;
+
+ File destJar = new File(args[0]);
+ File dirOrFile2Jar = new File(args[1]);
+
+ jarHelper.jarDir(dirOrFile2Jar, destJar);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
new file mode 100644
index 0000000..264008a
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FlinkInterpreterTest {
+
+ private static FlinkInterpreter flink;
+ private static InterpreterContext context;
+
+ @BeforeClass
+ public static void setUp() {
+ Properties p = new Properties();
+ flink = new FlinkInterpreter(p);
+ flink.open();
+ context = new InterpreterContext(null, null, null, null, null, null, null);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ flink.close();
+ flink.destroy();
+ }
+
+ @Test
+ public void testSimpleStatement() {
+ InterpreterResult result = flink.interpret("val a=1", context);
+ result = flink.interpret("print(a)", context);
+ assertEquals("1", result.message());
+ }
+
+
+ @Test
+ public void testWordCount() {
+ flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
+ flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
+ flink.interpret("counts.print()", context);
+ InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context);
+ assertEquals(Code.SUCCESS, result.code());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bbde084..dd3c2a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
<module>shell</module>
<module>hive</module>
<module>tajo</module>
+ <module>flink</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index bbf46fc..78a463c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -389,7 +389,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
- + "org.apache.zeppelin.tajo.TajoInterpreter"),
+ + "org.apache.zeppelin.tajo.TajoInterpreter,"
+ + "org.apache.zeppelin.flink.FlinkInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),