You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/01/02 04:06:19 UTC

incubator-zeppelin git commit: Add a (local mode) Scalding Interpreter to Zeppelin

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 45ce8a288 -> 8fdaaba94


Add a (local mode) Scalding Interpreter to Zeppelin

### What is this PR for?
Scalding (https://github.com/twitter/scalding) is a Scala library for writing MapReduce jobs.
This issue tracks the addition of a Scalding interpreter for Zeppelin. To keep this work incremental, this PR will focus on just a local mode implementation. The Hadoop mode can be a subsequent addition.

### What type of PR is it?
Feature

### Todos
* Addition of Hadoop mode for Scalding

### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-526

### How should this be tested?
Run the tests in: scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java

### Screenshots
<img width="1167" alt="scalding-example" src="https://cloud.githubusercontent.com/assets/1509691/11944979/8788d5c2-a7ff-11e5-9863-2a1216c51896.png">
<img width="1151" alt="scalding-screenshot" src="https://cloud.githubusercontent.com/assets/1509691/11944978/8787e3ec-a7ff-11e5-8383-456adb16b977.png">

### Questions:
* This could use documentation, which could just be the example in the screenshot. Where can I contribute that?

Author: Sriram Krishnan <sk...@twitter.com>

Closes #561 from sriramkrishnan/scalding and squashes the following commits:

ffa698b [Sriram Krishnan] Whitespace cleanup
1ad405b [Sriram Krishnan] Adding newline to remove redundant change in PR
8eec3c2 [Sriram Krishnan] Updating docs to include the -Pscalding profile for Scalding
006500d [Sriram Krishnan] Reverting all commits to LICENSE to be back to master
bc31d1e [Sriram Krishnan] Getting rid of added licenses
b30725f [Sriram Krishnan] Making the Scalding interpreter optional as part of a new -Pscalding profile
9a7d733 [Sriram Krishnan] Moved tukanni license to a separate section and added license
dd0bb9a [Sriram Krishnan] Changing licenses to text format
aaae5d1 [Sriram Krishnan] Moving licenses to the right location
8be8d22 [Sriram Krishnan] Went thru and added all licenses I could find
6019cc8 [Sriram Krishnan] More licenses. Only remaining ones are the dependencies of hadoop-common.
460658a [Sriram Krishnan] Adding Cascading dependencies
dd8a4c8 [Sriram Krishnan] Adding Scalding licenses
083f059 [Sriram Krishnan] Trimming deps down from hadoop-client to just hadoop-common. Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode.
5c8056c [Sriram Krishnan] Making the Scalding scala jars same as the Spark ones for consistency
d4cf308 [Sriram Krishnan] Adding docs for the Scalding interpreter
8004b39 [Sriram Krishnan] Fixing a typo
91b0692 [Sriram Krishnan] adding ScaldingInterpreter
5fd1ae4 [Sriram Krishnan] Address comments on PR. Merge remote-tracking branch 'upstream/master' into scalding
7a9ceeb [Sriram Krishnan] Adding some tests for the Scalding interpreter
7ec2941 [Sriram Krishnan] More code cleanup
368dc04 [Sriram Krishnan] Cleaning up imports, comments, etc
c27ec48 [Sriram Krishnan] Formatting, license
8944b0c [Sriram Krishnan] Merge remote-tracking branch 'upstream/master' into scalding
36a2dac [Sriram Krishnan] Added a link to the scalding code where the ILoop was lifted from.
d3916b7 [Sriram Krishnan] Adding modified version of ScaldingILoop for grabbing Console output - will want to move this to Scalding itself
1ffbb3b [Sriram Krishnan] Fixing output of stdout from console
b19fda4 [Sriram Krishnan] More cleanup - flushing output stream. Still can't seem to get the Scala console output. Need to figure that out.
e13576f [Sriram Krishnan] Now seem to be getting the console out, but only for last line. Will need some debugging.
35fc032 [Sriram Krishnan] Initial version of a ScaldingInterpreter running in local mode. Need to get console output next. And add tests, and make it work for HDFS.
721dcb7 [Sriram Krishnan] Getting a basic interpreter going. Next step is to hook in the Scalding REPL.


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/8fdaaba9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/8fdaaba9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/8fdaaba9

Branch: refs/heads/master
Commit: 8fdaaba94584ec5c9b8b5d8acb96016700720af1
Parents: 45ce8a2
Author: Sriram Krishnan <sk...@twitter.com>
Authored: Wed Dec 30 22:25:49 2015 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Jan 1 19:08:07 2016 -0800

----------------------------------------------------------------------
 README.md                                       |   5 +
 conf/zeppelin-site.xml.template                 |   2 +-
 docs/_includes/themes/zeppelin/_navigation.html |   1 +
 .../docs-img/scalding-InterpreterBinding.png    | Bin 0 -> 13029 bytes
 .../docs-img/scalding-InterpreterSelection.png  | Bin 0 -> 24926 bytes
 .../zeppelin/img/docs-img/scalding-pie.png      | Bin 0 -> 98697 bytes
 docs/docs.md                                    |   1 +
 docs/interpreter/scalding.md                    |  78 +++++
 pom.xml                                         |   7 +
 scalding/pom.xml                                | 202 +++++++++++++
 .../zeppelin/scalding/ScaldingInterpreter.java  | 288 +++++++++++++++++++
 .../zeppelin/scalding/ScaldingILoop.scala       | 111 +++++++
 .../scalding/ScaldingInterpreterTest.java       | 130 +++++++++
 .../zeppelin/conf/ZeppelinConfiguration.java    |   3 +-
 14 files changed, 826 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 85fc0b6..9a99b78 100644
--- a/README.md
+++ b/README.md
@@ -153,6 +153,11 @@ mvn clean package -Pspark-1.5 -Pmapr50 -DskipTests
 mvn clean package -Dignite.version=1.1.0-incubating -DskipTests
 ```
 
+#### Scalding Interpreter
+
+```
+mvn clean package -Pscalding -DskipTests
+```
 
 ### Configure
 If you wish to configure Zeppelin option (like port number), configure the following files:

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index b6aca75..74fa2e7 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -105,7 +105,7 @@
 
 <property>
   <name>zeppelin.interpreters</name>
-  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter</value>
+  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
   <description>Comma separated interpreter configurations. First interpreter become a default</description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/_includes/themes/zeppelin/_navigation.html
----------------------------------------------------------------------
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index 2c62282..5b2da51 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -45,6 +45,7 @@
                 <li><a href="{{BASE_PATH}}/interpreter/lens.html">Lens</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, hawq</a></li>
+                <li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
                 <li><a href="{{BASE_PATH}}/pleasecontribute.html">Shell</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li>
                 <li><a href="{{BASE_PATH}}/pleasecontribute.html">Tajo</a></li>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png
new file mode 100644
index 0000000..1131310
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png differ

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png
new file mode 100644
index 0000000..c52f4e3
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png differ

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png
new file mode 100644
index 0000000..bb01025
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png differ

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/docs.md
----------------------------------------------------------------------
diff --git a/docs/docs.md b/docs/docs.md
index a2347a8..b70ee58 100644
--- a/docs/docs.md
+++ b/docs/docs.md
@@ -41,6 +41,7 @@ limitations under the License.
 * [lens](./interpreter/lens.html)
 * [md](./interpreter/markdown.html)
 * [postgresql, hawq](./interpreter/postgresql.html)
+* [scalding](./interpreter/scalding.html)
 * [sh](./pleasecontribute.html)
 * [spark](./interpreter/spark.html)
 * [tajo](./pleasecontribute.html)

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/interpreter/scalding.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md
new file mode 100644
index 0000000..40ec8b1
--- /dev/null
+++ b/docs/interpreter/scalding.md
@@ -0,0 +1,78 @@
+---
+layout: page
+title: "Scalding Interpreter"
+description: ""
+group: manual
+---
+{% include JB/setup %}
+
+
+## Scalding Interpreter for Apache Zeppelin
+[Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs.
+
+### Building the Scalding Interpreter
+You have to first build the Scalding interpreter by enable the **scalding** profile as follows:
+
+```
+mvn clean package -Pscalding -DskipTests
+```
+
+### Enabling the Scalding Interpreter
+
+In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**.
+ 
+ <center>
+ ![Interpreter Binding](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png)
+
+ ![Interpreter Selection](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png)
+ </center>
+
+### Configuring the Interpreter
+Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything.
+
+### Testing the Interpreter
+
+In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.
+
+```
+%scalding
+
+import scala.io.Source
+
+// Get the Alice in Wonderland book from gutenberg.org:
+val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines
+val aliceLineNum = alice.zipWithIndex.toList
+val alicePipe = TypedPipe.from(aliceLineNum)
+
+// Now get a list of words for the book:
+val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList }
+
+// Now lets add a count for each word:
+val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) }
+
+// let's sum them for each word:
+val wordCount = aliceWithCount.group.sum
+
+print ("Here are the top 10 words\n")
+val top10 = wordCount
+  .groupAll
+  .sortBy { case (word, count) => -count }
+  .take(10)
+top10.dump
+
+```
+```
+%scalding
+
+val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n")
+print("%table " + table)
+
+```
+
+If you click on the icon for the pie chart, you should be able to see a chart like this:
+![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png)
+
+### Current Status & Future Work
+The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates. 
+
+The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5e492fa..88d38aa 100755
--- a/pom.xml
+++ b/pom.xml
@@ -628,6 +628,13 @@
     </profile>
 
     <profile>
+      <id>scalding</id>
+      <modules>
+        <module>scalding</module>
+      </modules>
+    </profile>
+
+    <profile>
       <id>build-distr</id>
       <activation>
         <activeByDefault>false</activeByDefault>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/pom.xml
----------------------------------------------------------------------
diff --git a/scalding/pom.xml b/scalding/pom.xml
new file mode 100644
index 0000000..abc1e2b
--- /dev/null
+++ b/scalding/pom.xml
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.6.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-scalding</artifactId>
+  <packaging>jar</packaging>
+  <version>0.6.0-incubating-SNAPSHOT</version>
+  <name>Zeppelin: Scalding interpreter</name>
+  <url>http://zeppelin.incubator.apache.org</url>
+
+  <properties>
+    <scala.version>2.10.4</scala.version>
+    <hadoop.version>2.3.0</hadoop.version>
+    <scalding.version>0.15.1-RC13</scalding.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>conjars</id>
+      <name>Concurrent Maven Repo</name>
+      <url>http://conjars.org/repo</url>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-exec</artifactId>
+      <version>1.3</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scalding-core_2.10</artifactId>
+      <version>${scalding.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scalding-repl_2.10</artifactId>
+      <version>${scalding.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-compiler</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-reflect</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
+    <!-- Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <version>2.7</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.3.1</version>            
+        <executions> 
+          <execution> 
+            <id>enforce</id> 
+            <phase>none</phase> 
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+          <execution>
+            <id>copy-artifact</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>${project.artifactId}</artifactId>
+                  <version>${project.version}</version>
+                  <type>${project.packaging}</type>
+                </artifactItem>
+              </artifactItems>              
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Plugin to compile Scala code -->
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>test-compile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
----------------------------------------------------------------------
diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
new file mode 100644
index 0000000..d43417e
--- /dev/null
+++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Console;
+import scala.Some;
+import scala.None;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+/**
+ * Scalding interpreter for Zeppelin. Based off the Spark interpreter code.
+ *
+ */
+public class ScaldingInterpreter extends Interpreter {
+  Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class);
+
+  public static final List<String> NO_COMPLETION = 
+    Collections.unmodifiableList(new ArrayList<String>());
+
+  static {
+    Interpreter.register("scalding", ScaldingInterpreter.class.getName());
+  }
+
+  private ScaldingILoop interpreter;
+  private ByteArrayOutputStream out;
+  private Map<String, Object> binder;
+
+  public ScaldingInterpreter(Properties property) {
+    super(property);
+    out = new ByteArrayOutputStream();
+  }
+
+  @Override
+  public void open() {
+    URL[] urls = getClassloaderUrls();
+
+    // Very nice discussion about how scala compiler handle classpath
+    // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
+
+    /*
+     * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
+     * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
+     * nsc.Settings.classpath.
+     *
+     * >> val settings = new Settings() >> settings.usejavacp.value = true >>
+     * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
+     * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
+     * getClass.getClassLoader >> } >> in.setContextClassLoader()
+     */
+    Settings settings = new Settings();
+
+    // set classpath for scala compiler
+    PathSetting pathSettings = settings.classpath();
+    String classpath = "";
+    List<File> paths = currentClassPath();
+    for (File f : paths) {
+      if (classpath.length() > 0) {
+        classpath += File.pathSeparator;
+      }
+      classpath += f.getAbsolutePath();
+    }
+
+    if (urls != null) {
+      for (URL u : urls) {
+        if (classpath.length() > 0) {
+          classpath += File.pathSeparator;
+        }
+        classpath += u.getFile();
+      }
+    }
+
+    pathSettings.v_$eq(classpath);
+    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+
+
+    // set classloader for scala compiler
+    settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
+        .getContextClassLoader()));
+    BooleanSetting b = (BooleanSetting) settings.usejavacp();
+    b.v_$eq(true);
+    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+    /* Scalding interpreter */
+    PrintStream printStream = new PrintStream(out);
+    interpreter = new ScaldingILoop(null, new PrintWriter(out));
+    interpreter.settings_$eq(settings);
+    interpreter.createInterpreter();
+
+    interpreter.intp().
+      interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
+    binder = (Map<String, Object>) getValue("_binder");
+    binder.put("out", printStream);
+  }
+
+  private Object getValue(String name) {
+    Object ret = interpreter.intp().valueOfTerm(name);
+    if (ret instanceof None) {
+      return null;
+    } else if (ret instanceof Some) {
+      return ((Some) ret).get();
+    } else {
+      return ret;
+    }
+  }
+
+  private List<File> currentClassPath() {
+    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
+    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+    if (cps != null) {
+      for (String cp : cps) {
+        paths.add(new File(cp));
+      }
+    }
+    return paths;
+  }
+
+  private List<File> classPath(ClassLoader cl) {
+    List<File> paths = new LinkedList<File>();
+    if (cl == null) {
+      return paths;
+    }
+
+    if (cl instanceof URLClassLoader) {
+      URLClassLoader ucl = (URLClassLoader) cl;
+      URL[] urls = ucl.getURLs();
+      if (urls != null) {
+        for (URL url : urls) {
+          paths.add(new File(url.getFile()));
+        }
+      }
+    }
+    return paths;
+  }
+
+  @Override
+  public void close() {
+    interpreter.intp().close();
+  }
+
+
+  @Override
+  public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
+    logger.info("Running Scalding command '" + cmd + "'");
+
+    if (cmd == null || cmd.trim().length() == 0) {
+      return new InterpreterResult(Code.SUCCESS);
+    }
+    return interpret(cmd.split("\n"), contextInterpreter);
+  }
+
+  public InterpreterResult interpret(String[] lines, InterpreterContext context) {
+    synchronized (this) {
+      InterpreterResult r = interpretInput(lines);
+      return r;
+    }
+  }
+
+  public InterpreterResult interpretInput(String[] lines) {
+
+    // add print("") to make sure not finishing with comment
+    // see https://github.com/NFLabs/zeppelin/issues/151
+    String[] linesToRun = new String[lines.length + 1];
+    for (int i = 0; i < lines.length; i++) {
+      linesToRun[i] = lines[i];
+    }
+    linesToRun[lines.length] = "print(\"\")";
+
+    Console.setOut((java.io.PrintStream) binder.get("out"));
+    out.reset();
+    Code r = null;
+    String incomplete = "";
+
+    for (int l = 0; l < linesToRun.length; l++) {
+      String s = linesToRun[l];
+      // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
+      if (l + 1 < linesToRun.length) {
+        String nextLine = linesToRun[l + 1].trim();
+        if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) {
+          incomplete += s + "\n";
+          continue;
+        }
+      }
+
+      scala.tools.nsc.interpreter.Results.Result res = null;
+      try {
+        res = interpreter.intp().interpret(incomplete + s);
+      } catch (Exception e) {
+        logger.error("Interpreter exception: ", e);
+        return new InterpreterResult(Code.ERROR, e.getMessage());
+      }
+
+      r = getResultCode(res);
+
+      if (r == Code.ERROR) {
+        Console.flush();
+        return new InterpreterResult(r, out.toString());
+      } else if (r == Code.INCOMPLETE) {
+        incomplete += s + "\n";
+      } else {
+        incomplete = "";
+      }
+    }
+
+    if (r == Code.INCOMPLETE) {
+      return new InterpreterResult(r, "Incomplete expression");
+    } else {
+      Console.flush();
+      return new InterpreterResult(r, out.toString());
+    }
+  }
+
+  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+      return Code.SUCCESS;
+    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+      return Code.INCOMPLETE;
+    } else {
+      return Code.ERROR;
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    // not implemented
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    // fine-grained progress not implemented - return 0
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+        ScaldingInterpreter.class.getName() + this.hashCode());
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return NO_COMPLETION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
new file mode 100644
index 0000000..bd23c49
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding;
+
+import java.io.{BufferedReader, File, FileReader}
+
+import scala.tools.nsc.GenericRunnerSettings
+import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter}
+
+
+/**
+ * A class providing Scalding specific commands for inclusion in the Scalding REPL.
+ * This is currently forked from Scalding, but should eventually make it into Scalding itself:
+ * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala
+ */
+ class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter)
+    extends ILoop(in0, out) {
+  // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
+  // def this() = this(None, new JPrintWriter(Console.out, true))
+
+  settings = new GenericRunnerSettings({ s => echo(s) })
+
+  override def printWelcome() {
+    val fc = Console.YELLOW
+    val wc = Console.RED
+    def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc)
+    echo(fc +
+      " (                                           \n" +
+      " )\\ )            (   (                       \n" +
+      "(()/(         )  )\\  )\\ )  (          (  (   \n" +
+      " /(_)) (   ( /( ((_)(()/( )\\   (     )\\))(  \n" +
+      "(_))   )\\  )( )) _   ((_)(( )  )\\ ) (( ))\\  \n".replaceAll("_", wc + "_" + fc) + wc +
+      wrapFlames("/ __|((_) ((_)_ | |  _| | (_) _(_(( (_()_) \n") +
+      wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\  \n") +
+      "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, |  \n" +
+      "                                    |___/   ")
+  }
+
+  /**
+   * Commands specific to the Scalding REPL. To define a new command use one of the following
+   * factory methods:
+   * - `LoopCommand.nullary` for commands that take no arguments
+   * - `LoopCommand.cmd` for commands that take one string argument
+   * - `LoopCommand.varargs` for commands that take multiple string arguments
+   */
+  private val scaldingCommands: List[LoopCommand] = List()
+
+  /**
+   * Change the shell prompt to read scalding&gt;
+   *
+   * @return a prompt string to use for this REPL.
+   */
+  override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET
+
+  private[this] def addImports(ids: String*): IR.Result =
+    if (ids.isEmpty) IR.Success
+    else intp.interpret("import " + ids.mkString(", "))
+
+  /**
+   * Search for files with the given name in all directories from current directory
+   * up to root.
+   */
+  private def findAllUpPath(filename: String): List[File] =
+    Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent)
+      .takeWhile(_ != "/")
+      .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename)))
+      .toList
+
+  /**
+   * Gets the list of commands that this REPL supports.
+   *
+   * @return a list of the command supported by this REPL.
+   */
+  override def commands: List[LoopCommand] = super.commands ++ scaldingCommands
+
+  protected def imports: List[String] = List(
+    "com.twitter.scalding._",
+    "com.twitter.scalding.ReplImplicits._",
+    "com.twitter.scalding.ReplImplicitContext._",
+    "com.twitter.scalding.ReplState._")
+
+  override def createInterpreter() {
+    super.createInterpreter()
+    intp.beQuietDuring {
+      addImports(imports: _*)
+
+      settings match {
+        case s: GenericRunnerSettings =>
+          findAllUpPath(".scalding_repl").reverse.foreach {
+            f => s.loadfiles.appendToValue(f.toString)
+          }
+        case _ => ()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
new file mode 100644
index 0000000..7a753fa
--- /dev/null
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * Tests for the Scalding interpreter for Zeppelin.
+ *
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ScaldingInterpreterTest {
+  public static ScaldingInterpreter repl;
+  private InterpreterContext context;
+  private File tmpDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
+    System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
+
+    tmpDir.mkdirs();
+
+    if (repl == null) {
+      Properties p = new Properties();
+
+      repl = new ScaldingInterpreter(p);
+      repl.open();
+    }
+
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    context = new InterpreterContext("note", "id", "title", "text",
+        new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
+            intpGroup.getId(), null),
+        new LinkedList<InterpreterContextRunner>());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    delete(tmpDir);
+    repl.close();
+  }
+
+  private void delete(File file) {
+    if (file.isFile()) file.delete();
+    else if (file.isDirectory()) {
+      File[] files = file.listFiles();
+      if (files != null && files.length > 0) {
+        for (File f : files) {
+          delete(f);
+        }
+      }
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testBasicIntp() {
+    assertEquals(InterpreterResult.Code.SUCCESS,
+        repl.interpret("val a = 1\nval b = 2", context).code());
+
+    // when interpret incomplete expression
+    InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
+    assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
+    assertTrue(incomplete.message().length() > 0); // expecting some error
+                                                   // message
+  }
+
+  @Test
+  public void testBasicScalding() {
+    assertEquals(InterpreterResult.Code.SUCCESS,
+        repl.interpret("case class Sale(state: String, name: String, sale: Int)\n" +
+          "val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\", \"A\", 20), Sale(\"VA\", \"B\", 15))\n" +
+          "val salesPipe = TypedPipe.from(salesList)\n" +
+          "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" +
+          "    groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" +
+          "results.dump", 
+          context).code());
+  }
+
+  @Test
+  public void testNextLineInvocation() {
+    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
+  }
+
+  @Test
+  public void testEndWithComment() {
+    assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
+  }
+
+  @Test
+  public void testReferencingUndefinedVal() {
+    InterpreterResult result = repl.interpret("def category(min: Int) = {"
+        + "    if (0 <= value) \"error\"" + "}", context);
+    assertEquals(Code.ERROR, result.code());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 87d1c20..ed3b8c0 100755
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -411,7 +411,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
         + "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
         + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
         + "org.apache.zeppelin.kylin.KylinInterpreter,"
-        + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter"),
+        + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
+        + "org.apache.zeppelin.scalding.ScaldingInterpreter"),
     ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
     ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),