You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/06/09 20:14:51 UTC

incubator-zeppelin git commit: ZEPPELIN-44 Interpreter for Apache Flink

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master ec7a59c09 -> f0301cdfa


ZEPPELIN-44 Interpreter for Apache Flink

#### Interpreter for [Apache Flink](http://flink.apache.org/).

Flink people helped a lot to write the interpreter. Thanks so much! Some codes are copied from Flink's development branch. Once Flink releases 0.9, copied code and snapshot repository configuration will be removed.

#### Build

if there're no options, by default it is building against flink 0.9.0-milestone-1.
With combination of Zeppelin, it is good idea to use 0.9-SNAPSHOT, because of it support .collect() that helps really a lot to get results data and display it on Zeppelin.

So, you might want to build in this way,

```
mvn package -Dflink.version=0.9-SNAPSHOT -DskipTests
```

#### Screenshot

![image](https://cloud.githubusercontent.com/assets/1540981/7674014/32c0ee68-fd59-11e4-9ee6-5ec0bbf6a8f2.png)

Author: Lee moon soo <mo...@apache.org>

Closes #75 from Leemoonsoo/flink and squashes the following commits:

f08bd25 [Lee moon soo] Update pom.xml after https://github.com/apache/incubator-zeppelin/pull/88
460cf46 [Lee moon soo] jarr up -> jar up
501efb3 [Lee moon soo] Add scalastyle
e69e5ba [Lee moon soo] Add license
7be1f90 [Lee moon soo] Add apache snapshot repo
ebbd0da [Lee moon soo] Fix unittest and update comment
27fc306 [Lee moon soo] Cleaning up
f2a66df [Lee moon soo] Initial implementation of interpreter for Apache Flink


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f0301cdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f0301cdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f0301cdf

Branch: refs/heads/master
Commit: f0301cdfa4ba498f1a4725c7eee41823fad7fcd6
Parents: ec7a59c
Author: Lee moon soo <mo...@apache.org>
Authored: Mon Jun 8 00:44:38 2015 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Jun 9 11:14:42 2015 -0700

----------------------------------------------------------------------
 _tools/scalastyle.xml                           | 146 +++++++
 conf/zeppelin-site.xml.template                 |   2 +-
 flink/pom.xml                                   | 386 +++++++++++++++++++
 .../apache/zeppelin/flink/FlinkEnvironment.java |  83 ++++
 .../org/apache/zeppelin/flink/FlinkIMain.java   |  92 +++++
 .../apache/zeppelin/flink/FlinkInterpreter.java | 312 +++++++++++++++
 .../org/apache/zeppelin/flink/JarHelper.java    | 219 +++++++++++
 .../zeppelin/flink/FlinkInterpreterTest.java    |  66 ++++
 pom.xml                                         |   1 +
 .../zeppelin/conf/ZeppelinConfiguration.java    |   3 +-
 10 files changed, 1308 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/_tools/scalastyle.xml
----------------------------------------------------------------------
diff --git a/_tools/scalastyle.xml b/_tools/scalastyle.xml
new file mode 100644
index 0000000..f7bb0d4
--- /dev/null
+++ b/_tools/scalastyle.xml
@@ -0,0 +1,146 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!-- NOTE: This was taken and adapted from Apache Spark. -->
+
+<!-- If you wish to turn off checking for a section of code, you can put a comment in the source
+ before and after the section, with the following syntax: -->
+<!-- // scalastyle:off -->
+<!-- ... -->
+<!-- // naughty stuff -->
+<!-- ... -->
+<!-- // scalastyle:on -->
+
+<scalastyle>
+ <name>Scalastyle standard configuration</name>
+ <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <!-- <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxFileLength"><![CDATA[800]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+  <parameters>
+      <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+  <parameters>
+   <parameter name="maxLineLength"><![CDATA[100]]></parameter>
+   <parameter name="tabSize"><![CDATA[2]]></parameter>
+   <parameter name="ignoreImports">true</parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+  <parameters>
+   <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+  </parameters>
+ </check>
+ <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+  <parameters>
+   <parameter name="maxParameters"><![CDATA[10]]></parameter>
+  </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="false"></check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> -->
+ <!-- <check level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="regex"><![CDATA[println]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxTypes"><![CDATA[30]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maximum"><![CDATA[10]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="false"></check>
+ <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+  <parameters>
+   <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+   <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
+  </parameters>
+ </check>
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxLength"><![CDATA[50]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> -->
+ <!--  <parameters> -->
+ <!--   <parameter name="maxMethods"><![CDATA[30]]></parameter> -->
+ <!--  </parameters> -->
+ <!-- </check> -->
+ <!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> -->
+ <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+</scalastyle>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 9f773d5..e10c85e 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -66,7 +66,7 @@
 
 <property>
   <name>zeppelin.interpreters</name>
-  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter</value>
+  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter</value>
   <description>Comma separated interpreter configurations. First interpreter become a default</description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
new file mode 100644
index 0000000..68aa62d
--- /dev/null
+++ b/flink/pom.xml
@@ -0,0 +1,386 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-flink</artifactId>
+  <packaging>jar</packaging>
+  <version>0.5.0-incubating-SNAPSHOT</version>
+  <name>Zeppelin: Flink</name>
+  <description>Zeppelin flink support</description>
+  <url>http://zeppelin.incubator.apache.org</url>
+
+  <properties>
+    <flink.version>0.9.0-milestone-1</flink.version>
+    <flink.akka.version>2.3.7</flink.akka.version>
+    <flink.scala.binary.version>2.10</flink.scala.binary.version>
+    <flink.scala.version>2.10.4</flink.scala.version>
+    <scala.macros.version>2.0.1</scala.macros.version>
+  </properties>
+
+  <repositories>
+    <!-- for flink 0.9-SNAPSHOT. After 0.9 released, it can be removed -->
+    <repository>
+      <id>apache.snapshots</id>
+      <name>Apache Development Snapshot Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-scala</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_${flink.scala.binary.version}</artifactId>
+      <version>${flink.akka.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-remote_${flink.scala.binary.version}</artifactId>
+      <version>${flink.akka.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-slf4j_${flink.scala.binary.version}</artifactId>
+      <version>${flink.akka.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-testkit_${flink.scala.binary.version}</artifactId>
+      <version>${flink.akka.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${flink.scala.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${flink.scala.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${flink.scala.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>**/.idea/</exclude>
+            <exclude>**/*.iml</exclude>
+            <exclude>.gitignore</exclude>
+            <exclude>**/.settings/*</exclude>
+            <exclude>**/.classpath</exclude>
+            <exclude>**/.project</exclude>
+            <exclude>**/target/**</exclude>
+            <exclude>**/README.md</exclude>
+            <exclude>dependency-reduced-pom.xml</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+
+      <!-- Scala Compiler -->
+      <plugin>
+	<groupId>net.alchim31.maven</groupId>
+	<artifactId>scala-maven-plugin</artifactId>
+	<version>3.1.4</version>
+	<executions>
+	  <!-- Run scala compiler in the process-resources phase, so that dependencies on
+	       scala classes can be resolved later in the (Java) compile phase -->
+	  <execution>
+	    <id>scala-compile-first</id>
+	    <phase>process-resources</phase>
+	    <goals>
+	      <goal>compile</goal>
+	    </goals>
+	  </execution>
+          
+	  <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+	       scala classes can be resolved later in the (Java) test-compile phase -->
+	  <execution>
+	    <id>scala-test-compile</id>
+	    <phase>process-test-resources</phase>
+	    <goals>
+	      <goal>testCompile</goal>
+	    </goals>
+	  </execution>
+	</executions>
+	<configuration>
+	  <jvmArgs>
+	    <jvmArg>-Xms128m</jvmArg>
+	    <jvmArg>-Xmx512m</jvmArg>
+	  </jvmArgs>
+	  <compilerPlugins combine.children="append">
+	    <compilerPlugin>
+	      <groupId>org.scalamacros</groupId>
+	      <artifactId>paradise_${flink.scala.version}</artifactId>
+	      <version>${scala.macros.version}</version>
+	    </compilerPlugin>
+	  </compilerPlugins>
+	</configuration>
+      </plugin>
+
+      <!-- Eclipse Integration -->
+      <plugin>
+	<groupId>org.apache.maven.plugins</groupId>
+	<artifactId>maven-eclipse-plugin</artifactId>
+	<version>2.8</version>
+	<configuration>
+	  <downloadSources>true</downloadSources>
+	  <projectnatures>
+	    <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+	    <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+	  </projectnatures>
+	  <buildcommands>
+	    <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+	  </buildcommands>
+	  <classpathContainers>
+	    <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+	    <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+	  </classpathContainers>
+	  <!-- excludes>
+	    <exclude>org.scala-lang:scala-library</exclude>
+	    <exclude>org.scala-lang:scala-compiler</exclude>
+	  </excludes -->
+	  <sourceIncludes>
+	    <sourceInclude>**/*.scala</sourceInclude>
+	    <sourceInclude>**/*.java</sourceInclude>
+	  </sourceIncludes>
+	</configuration>
+      </plugin>
+
+      <!-- Adding scala source directories to build path -->
+      <plugin>
+	<groupId>org.codehaus.mojo</groupId>
+	<artifactId>build-helper-maven-plugin</artifactId>
+	<version>1.7</version>
+	<executions>
+	  <!-- Add src/main/scala to eclipse build path -->
+	  <execution>
+	    <id>add-source</id>
+	    <phase>generate-sources</phase>
+	    <goals>
+	      <goal>add-source</goal>
+	    </goals>
+	    <configuration>
+	      <sources>
+		<source>src/main/scala</source>
+	      </sources>
+	    </configuration>
+	  </execution>
+	  <!-- Add src/test/scala to eclipse build path -->
+	  <execution>
+	    <id>add-test-source</id>
+	    <phase>generate-test-sources</phase>
+	    <goals>
+	      <goal>add-test-source</goal>
+	    </goals>
+	    <configuration>
+	      <sources>
+		<source>src/test/scala</source>
+	      </sources>
+	    </configuration>
+	  </execution>
+	</executions>
+      </plugin>
+
+      <plugin>
+	<groupId>org.scalastyle</groupId>
+	<artifactId>scalastyle-maven-plugin</artifactId>
+	<version>0.5.0</version>
+	<executions>
+	  <execution>
+	    <goals>
+	      <goal>check</goal>
+	    </goals>
+	  </execution>
+	</executions>
+	<configuration>
+	  <verbose>false</verbose>
+	  <failOnViolation>true</failOnViolation>
+	  <includeTestSourceDirectory>true</includeTestSourceDirectory>
+	  <failOnWarning>false</failOnWarning>
+	  <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+	  <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+	  <configLocation>${project.basedir}/../_tools/scalastyle.xml</configLocation>
+	  <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
+	  <outputEncoding>UTF-8</outputEncoding>
+	</configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <version>2.7</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.3.1</version>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.17</version>
+        <configuration>
+          <forkCount>1</forkCount>
+          <reuseForks>false</reuseForks>
+          <argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>${project.artifactId}</artifactId>
+                  <version>${project.version}</version>
+                  <type>${project.packaging}</type>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
new file mode 100644
index 0000000..629932b
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.File;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class override execute() method to create an PlanExecutor with
+ * jar file that packages classes from scala compiler.
+ */
+public class FlinkEnvironment extends ExecutionEnvironment {
+  Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
+
+  private String host;
+  private int port;
+
+  private FlinkIMain imain;
+
+  public FlinkEnvironment(String host, int port, FlinkIMain imain) {
+    this.host = host;
+    this.port = port;
+    this.imain = imain;
+
+    logger.info("jobManager host={}, port={}", host, port);
+  }
+
+  @Override
+  public JobExecutionResult execute(String jobName) throws Exception {
+    JavaPlan plan = createProgramPlan(jobName);
+
+    File jarFile = imain.jar();
+    PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
+        jarFile.getAbsolutePath());
+
+    JobExecutionResult result = executor.executePlan(plan);
+
+    if (jarFile.isFile()) {
+      jarFile.delete();
+    }
+
+    return result;
+  }
+
+  @Override
+  public String getExecutionPlan() throws Exception {
+    JavaPlan plan = createProgramPlan("unnamed", false);
+    plan.setDefaultParallelism(getParallelism());
+    registerCachedFilesWithPlan(plan);
+
+    File jarFile = imain.jar();
+    PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
+        jarFile.getAbsolutePath());
+    String jsonPlan = executor.getOptimizerPlanAsJSON(plan);
+
+    if (jarFile != null && jarFile.isFile()) {
+      jarFile.delete();
+    }
+
+    return jsonPlan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
new file mode 100644
index 0000000..ee6516c
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.reflect.io.AbstractFile;
+import scala.reflect.io.VirtualDirectory;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.IMain;
+
+/**
+ * Scala compiler
+ */
+public class FlinkIMain extends IMain {
+  Logger logger = LoggerFactory.getLogger(FlinkIMain.class);
+
+  public FlinkIMain(Settings setting, PrintWriter out) {
+    super(setting, out);
+  }
+
+  public File jar() throws IOException {
+    VirtualDirectory classDir = virtualDirectory();
+    // create execution environment
+    File jarBuildDir = new File(System.getProperty("java.io.tmpdir")
+        + "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis());
+    jarBuildDir.mkdirs();
+
+    File jarFile = new File(System.getProperty("java.io.tmpdir")
+        + "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar");
+
+
+    Iterator<AbstractFile> vdIt = classDir.iterator();
+    while (vdIt.hasNext()) {
+      AbstractFile fi = vdIt.next();
+      if (fi.isDirectory()) {
+        Iterator<AbstractFile> fiIt = fi.iterator();
+        while (fiIt.hasNext()) {
+          AbstractFile f = fiIt.next();
+
+          // directory for compiled line
+          File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name());
+          lineDir.mkdirs();
+
+          // compiled classes for commands from shell
+          File writeFile = new File(lineDir.getAbsolutePath(), f.name());
+          FileOutputStream outputStream = new FileOutputStream(writeFile);
+          InputStream inputStream = f.input();
+
+          // copy file contents
+          org.apache.commons.io.IOUtils.copy(inputStream, outputStream);
+
+          inputStream.close();
+          outputStream.close();
+        }
+      }
+    }
+
+    // jar up
+    JarHelper jh = new JarHelper();
+    jh.jarDir(jarBuildDir, jarFile);
+
+    FileUtils.deleteDirectory(jarBuildDir);
+    return jarFile;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
new file mode 100644
index 0000000..b342f4e
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Console;
+import scala.None;
+import scala.Some;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+/**
+ * Interpreter for Apache Flink (http://flink.apache.org)
+ */
+public class FlinkInterpreter extends Interpreter {
+  Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
+  private Settings settings;
+  private ByteArrayOutputStream out;
+  private FlinkIMain imain;
+  private Map<String, Object> binder;
+  private ExecutionEnvironment env;
+  private Configuration flinkConf;
+  private LocalFlinkMiniCluster localFlinkCluster;
+
+  public FlinkInterpreter(Properties property) {
+    super(property);
+  }
+
+  static {
+    Interpreter.register(
+        "flink",
+        "flink",
+        FlinkInterpreter.class.getName(),
+        new InterpreterPropertyBuilder()
+          .add("local", "true", "Run flink locally")
+          .add("jobmanager.rpc.address", "localhost", "Flink cluster")
+          .add("jobmanager.rpc.port", "6123", "Flink cluster")
+          .build()
+    );
+  }
+
+  @Override
+  public void open() {
+    URL[] urls = getClassloaderUrls();
+    this.settings = new Settings();
+
+    // set classpath
+    PathSetting pathSettings = settings.classpath();
+    String classpath = "";
+    List<File> paths = currentClassPath();
+    for (File f : paths) {
+      if (classpath.length() > 0) {
+        classpath += File.pathSeparator;
+      }
+      classpath += f.getAbsolutePath();
+    }
+
+    if (urls != null) {
+      for (URL u : urls) {
+        if (classpath.length() > 0) {
+          classpath += File.pathSeparator;
+        }
+        classpath += u.getFile();
+      }
+    }
+
+    pathSettings.v_$eq(classpath);
+    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+    settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
+        .getContextClassLoader()));
+    BooleanSetting b = (BooleanSetting) settings.usejavacp();
+    b.v_$eq(true);
+    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+    out = new ByteArrayOutputStream();
+    imain = new FlinkIMain(settings, new PrintWriter(out));
+
+    initializeFlinkEnv();
+  }
+
+  private boolean localMode() {
+    return Boolean.parseBoolean(getProperty("local"));
+  }
+
+  private String getRpcAddress() {
+    if (localMode()) {
+      return "localhost";
+    } else {
+      return getProperty("jobmanager.rpc.address");
+    }
+  }
+
+  private int getRpcPort() {
+    if (localMode()) {
+      return localFlinkCluster.getJobManagerRPCPort();
+    } else {
+      return Integer.parseInt(getProperty("jobmanager.rpc.port"));
+    }
+  }
+
+  private void initializeFlinkEnv() {
+    // prepare bindings
+    imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
+    binder = (Map<String, Object>) getValue("_binder");
+
+    flinkConf = new org.apache.flink.configuration.Configuration();
+    Properties intpProperty = getProperty();
+    for (Object k : intpProperty.keySet()) {
+      String key = (String) k;
+      String val = toString(intpProperty.get(key));
+      flinkConf.setString(key, val);
+    }
+
+    if (localMode()) {
+      startFlinkMiniCluster();
+    }
+
+    env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain);
+    binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env));
+
+    // do import and create val
+    imain.interpret("@transient val env = "
+        + "_binder.get(\"env\")"
+        + ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]");
+
+    imain.interpret("import org.apache.flink.api.scala._");
+  }
+
+
+  private List<File> currentClassPath() {
+    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
+    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+    if (cps != null) {
+      for (String cp : cps) {
+        paths.add(new File(cp));
+      }
+    }
+    return paths;
+  }
+
+  private List<File> classPath(ClassLoader cl) {
+    List<File> paths = new LinkedList<File>();
+    if (cl == null) {
+      return paths;
+    }
+
+    if (cl instanceof URLClassLoader) {
+      URLClassLoader ucl = (URLClassLoader) cl;
+      URL[] urls = ucl.getURLs();
+      if (urls != null) {
+        for (URL url : urls) {
+          paths.add(new File(url.getFile()));
+        }
+      }
+    }
+    return paths;
+  }
+
+  public Object getValue(String name) {
+    Object ret = imain.valueOfTerm(name);
+    if (ret instanceof None) {
+      return null;
+    } else if (ret instanceof Some) {
+      return ((Some) ret).get();
+    } else {
+      return ret;
+    }
+  }
+
+  @Override
+  public void close() {
+    imain.close();
+
+    if (localMode()) {
+      stopFlinkMiniCluster();
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String line, InterpreterContext context) {
+    if (line == null || line.trim().length() == 0) {
+      return new InterpreterResult(Code.SUCCESS);
+    }
+
+    InterpreterResult result = interpret(line.split("\n"), context);
+    return result;
+  }
+
+  public InterpreterResult interpret(String[] lines, InterpreterContext context) {
+    String[] linesToRun = new String[lines.length + 1];
+    for (int i = 0; i < lines.length; i++) {
+      linesToRun[i] = lines[i];
+    }
+    linesToRun[lines.length] = "print(\"\")";
+
+    Console.setOut(out);
+    out.reset();
+    Code r = null;
+
+    String incomplete = "";
+    for (String s : linesToRun) {
+      scala.tools.nsc.interpreter.Results.Result res = null;
+      try {
+        res = imain.interpret(incomplete + s);
+      } catch (Exception e) {
+        logger.info("Interpreter exception", e);
+        return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
+      }
+
+      r = getResultCode(res);
+
+      if (r == Code.ERROR) {
+        return new InterpreterResult(r, out.toString());
+      } else if (r == Code.INCOMPLETE) {
+        incomplete += s + "\n";
+      } else {
+        incomplete = "";
+      }
+    }
+
+    if (r == Code.INCOMPLETE) {
+      return new InterpreterResult(r, "Incomplete expression");
+    } else {
+      return new InterpreterResult(r, out.toString());
+    }
+  }
+
+  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+      return Code.SUCCESS;
+    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+      return Code.INCOMPLETE;
+    } else {
+      return Code.ERROR;
+    }
+  }
+
+
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return new LinkedList<String>();
+  }
+
+  private void startFlinkMiniCluster() {
+    localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false);
+    localFlinkCluster.waitForTaskManagersToBeRegistered();
+  }
+
+  private void stopFlinkMiniCluster() {
+    if (localFlinkCluster != null) {
+      localFlinkCluster.shutdown();
+      localFlinkCluster = null;
+    }
+  }
+
+  static final String toString(Object o) {
+    return (o instanceof String) ? (String) o : "";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
new file mode 100644
index 0000000..efc4951
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+
+/**
+ * This class copied from flink-scala-shell. Once the flink-0.9 is published in
+ * the maven repository, this class can be removed
+ *
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from
+ * http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans
+ * /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan <pc...@bea.com>
+ */
+public class JarHelper {
+  // ========================================================================
+  // Constants
+
+  private static final int BUFFER_SIZE = 2156;
+
+  // ========================================================================
+  // Variables
+
+  private byte[] mBuffer = new byte[BUFFER_SIZE];
+  private int mByteCount = 0;
+  private boolean mVerbose = false;
+  private String mDestJarName = "";
+
+  // ========================================================================
+  // Constructor
+
+  /**
+   * Instantiates a new JarHelper.
+   */
+  public JarHelper() {
+  }
+
+  // ========================================================================
+  // Public methods
+
+  /**
+   * Jars a given directory or single file into a JarOutputStream.
+   */
+  public void jarDir(File dirOrFile2Jar, File destJar) throws IOException {
+
+    if (dirOrFile2Jar == null || destJar == null) {
+      throw new IllegalArgumentException();
+    }
+
+    mDestJarName = destJar.getCanonicalPath();
+    FileOutputStream fout = new FileOutputStream(destJar);
+    JarOutputStream jout = new JarOutputStream(fout);
+    // jout.setLevel(0);
+    try {
+      jarDir(dirOrFile2Jar, jout, null);
+    } catch (IOException ioe) {
+      throw ioe;
+    } finally {
+      jout.close();
+      fout.close();
+    }
+  }
+
+  /**
+   * Unjars a given jar file into a given directory.
+   */
+  public void unjarDir(File jarFile, File destDir) throws IOException {
+    BufferedOutputStream dest = null;
+    FileInputStream fis = new FileInputStream(jarFile);
+    unjar(fis, destDir);
+  }
+
+  /**
+   * Given an InputStream on a jar file, unjars the contents into the given
+   * directory.
+   */
+  public void unjar(InputStream in, File destDir) throws IOException {
+    BufferedOutputStream dest = null;
+    JarInputStream jis = new JarInputStream(in);
+    JarEntry entry;
+    while ((entry = jis.getNextJarEntry()) != null) {
+      if (entry.isDirectory()) {
+        File dir = new File(destDir, entry.getName());
+        dir.mkdir();
+        if (entry.getTime() != -1) {
+          dir.setLastModified(entry.getTime());
+        }
+        continue;
+      }
+      int count;
+      byte[] data = new byte[BUFFER_SIZE];
+      File destFile = new File(destDir, entry.getName());
+      if (mVerbose) {
+        System.out
+            .println("unjarring " + destFile + " from " + entry.getName());
+      }
+      FileOutputStream fos = new FileOutputStream(destFile);
+      dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+      while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+        dest.write(data, 0, count);
+      }
+      dest.flush();
+      dest.close();
+      if (entry.getTime() != -1) {
+        destFile.setLastModified(entry.getTime());
+      }
+    }
+    jis.close();
+  }
+
+  public void setVerbose(boolean b) {
+    mVerbose = b;
+  }
+
+  // ========================================================================
+  // Private methods
+
+  private static final char SEP = '/';
+
+  /**
+   * Recursively jars up the given path under the given directory.
+   */
+  private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
+      throws IOException {
+    if (mVerbose) {
+      System.out.println("checking " + dirOrFile2jar);
+    }
+    if (dirOrFile2jar.isDirectory()) {
+      String[] dirList = dirOrFile2jar.list();
+      String subPath = (path == null) ? ""
+          : (path + dirOrFile2jar.getName() + SEP);
+      if (path != null) {
+        JarEntry je = new JarEntry(subPath);
+        je.setTime(dirOrFile2jar.lastModified());
+        jos.putNextEntry(je);
+        jos.flush();
+        jos.closeEntry();
+      }
+      for (int i = 0; i < dirList.length; i++) {
+        File f = new File(dirOrFile2jar, dirList[i]);
+        jarDir(f, jos, subPath);
+      }
+    } else {
+      if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) {
+        if (mVerbose) {
+          System.out.println("skipping " + dirOrFile2jar.getPath());
+        }
+        return;
+      }
+
+      if (mVerbose) {
+        System.out.println("adding " + dirOrFile2jar.getPath());
+      }
+      FileInputStream fis = new FileInputStream(dirOrFile2jar);
+      try {
+        JarEntry entry = new JarEntry(path + dirOrFile2jar.getName());
+        entry.setTime(dirOrFile2jar.lastModified());
+        jos.putNextEntry(entry);
+        while ((mByteCount = fis.read(mBuffer)) != -1) {
+          jos.write(mBuffer, 0, mByteCount);
+          if (mVerbose) {
+            System.out.println("wrote " + mByteCount + " bytes");
+          }
+        }
+        jos.flush();
+        jos.closeEntry();
+      } catch (IOException ioe) {
+        throw ioe;
+      } finally {
+        fis.close();
+      }
+    }
+  }
+
+  // for debugging
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2) {
+      System.err.println("Usage: JarHelper jarname.jar directory");
+      return;
+    }
+
+    JarHelper jarHelper = new JarHelper();
+    jarHelper.mVerbose = true;
+
+    File destJar = new File(args[0]);
+    File dirOrFile2Jar = new File(args[1]);
+
+    jarHelper.jarDir(dirOrFile2Jar, destJar);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
new file mode 100644
index 0000000..264008a
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FlinkInterpreterTest {
+
+  private static FlinkInterpreter flink;
+  private static InterpreterContext context;
+
+  @BeforeClass
+  public static void setUp() {
+    Properties p = new Properties();
+    flink = new FlinkInterpreter(p);
+    flink.open();
+    context = new InterpreterContext(null, null, null, null, null, null, null);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    flink.close();
+    flink.destroy();
+  }
+
+  @Test
+  public void testSimpleStatement() {
+    InterpreterResult result = flink.interpret("val a=1", context);
+    result = flink.interpret("print(a)", context);
+    assertEquals("1", result.message());
+  }
+
+
+  @Test
+  public void testWordCount() {
+    flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
+    flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
+    flink.interpret("counts.print()", context);
+    InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context);
+    assertEquals(Code.SUCCESS, result.code());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bbde084..dd3c2a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
     <module>shell</module>
     <module>hive</module>
     <module>tajo</module>
+    <module>flink</module>
     <module>zeppelin-web</module>
     <module>zeppelin-server</module>
     <module>zeppelin-distribution</module>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index bbf46fc..78a463c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -389,7 +389,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
         + "org.apache.zeppelin.angular.AngularInterpreter,"
         + "org.apache.zeppelin.shell.ShellInterpreter,"
         + "org.apache.zeppelin.hive.HiveInterpreter,"
-        + "org.apache.zeppelin.tajo.TajoInterpreter"),
+        + "org.apache.zeppelin.tajo.TajoInterpreter,"
+        + "org.apache.zeppelin.flink.FlinkInterpreter"),
         ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
         ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
         ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),