You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/01/02 04:06:19 UTC
incubator-zeppelin git commit: Add a (local mode) Scalding
Interpreter to Zeppelin
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master 45ce8a288 -> 8fdaaba94
Add a (local mode) Scalding Interpreter to Zeppelin
### What is this PR for?
Scalding (https://github.com/twitter/scalding) is a Scala library for writing MapReduce jobs.
This issue tracks the addition of a Scalding interpreter for Zeppelin. To keep this work incremental, this PR will focus on just a local mode implementation. The Hadoop mode can be a subsequent addition.
### What type of PR is it?
Feature
### Todos
* Addition of Hadoop mode for Scalding
### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-526
### How should this be tested?
Run the tests in: scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
### Screenshots
<img width="1167" alt="scalding-example" src="https://cloud.githubusercontent.com/assets/1509691/11944979/8788d5c2-a7ff-11e5-9863-2a1216c51896.png">
<img width="1151" alt="scalding-screenshot" src="https://cloud.githubusercontent.com/assets/1509691/11944978/8787e3ec-a7ff-11e5-8383-456adb16b977.png">
### Questions:
* This could use documentation, which could just be the example in the screenshot. Where can I contribute that?
Author: Sriram Krishnan <sk...@twitter.com>
Closes #561 from sriramkrishnan/scalding and squashes the following commits:
ffa698b [Sriram Krishnan] Whitespace cleanup
1ad405b [Sriram Krishnan] Adding newline to remove redundant change in PR
8eec3c2 [Sriram Krishnan] Updating docs to include the -Pscalding profile for Scalding
006500d [Sriram Krishnan] Reverting all commits to LICENSE to be back to master
bc31d1e [Sriram Krishnan] Getting rid of added licenses
b30725f [Sriram Krishnan] Making the Scalding interpreter optional as part of a new -Pscalding profile
9a7d733 [Sriram Krishnan] Moved tukanni license to a separate section and added license
dd0bb9a [Sriram Krishnan] Changing licenses to text format
aaae5d1 [Sriram Krishnan] Moving licenses to the right location
8be8d22 [Sriram Krishnan] Went thru and added all licenses I could find
6019cc8 [Sriram Krishnan] More licenses. Only remaining ones are the dependencies of hadoop-common.
460658a [Sriram Krishnan] Adding Cascading dependencies
dd8a4c8 [Sriram Krishnan] Adding Scalding licenses
083f059 [Sriram Krishnan] Trimming deps down from hadoop-client to just hadoop-common. Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode.
5c8056c [Sriram Krishnan] Making the Scalding scala jars same as the Spark ones for consistency
d4cf308 [Sriram Krishnan] Adding docs for the Scalding interpreter
8004b39 [Sriram Krishnan] Fixing a typo
91b0692 [Sriram Krishnan] adding ScaldingInterpreter
5fd1ae4 [Sriram Krishnan] Address comments on PR. Merge remote-tracking branch 'upstream/master' into scalding
7a9ceeb [Sriram Krishnan] Adding some tests for the Scalding interpreter
7ec2941 [Sriram Krishnan] More code cleanup
368dc04 [Sriram Krishnan] Cleaning up imports, comments, etc
c27ec48 [Sriram Krishnan] Formatting, license
8944b0c [Sriram Krishnan] Merge remote-tracking branch 'upstream/master' into scalding
36a2dac [Sriram Krishnan] Added a link to the scalding code where the ILoop was lifted from.
d3916b7 [Sriram Krishnan] Adding modified version of ScaldingILoop for grabbing Console output - will want to move this to Scalding itself
1ffbb3b [Sriram Krishnan] Fixing output of stdout from console
b19fda4 [Sriram Krishnan] More cleanup - flushing output stream. Still can't seem to get the Scala console output. Need to figure that out.
e13576f [Sriram Krishnan] Now seem to be getting the console out, but only for last line. Will need some debugging.
35fc032 [Sriram Krishnan] Initial version of a ScaldingInterpreter running in local mode. Need to get console output next. And add tests, and make it work for HDFS.
721dcb7 [Sriram Krishnan] Getting a basic interpreter going. Next step is to hook in the Scalding REPL.
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/8fdaaba9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/8fdaaba9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/8fdaaba9
Branch: refs/heads/master
Commit: 8fdaaba94584ec5c9b8b5d8acb96016700720af1
Parents: 45ce8a2
Author: Sriram Krishnan <sk...@twitter.com>
Authored: Wed Dec 30 22:25:49 2015 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Jan 1 19:08:07 2016 -0800
----------------------------------------------------------------------
README.md | 5 +
conf/zeppelin-site.xml.template | 2 +-
docs/_includes/themes/zeppelin/_navigation.html | 1 +
.../docs-img/scalding-InterpreterBinding.png | Bin 0 -> 13029 bytes
.../docs-img/scalding-InterpreterSelection.png | Bin 0 -> 24926 bytes
.../zeppelin/img/docs-img/scalding-pie.png | Bin 0 -> 98697 bytes
docs/docs.md | 1 +
docs/interpreter/scalding.md | 78 +++++
pom.xml | 7 +
scalding/pom.xml | 202 +++++++++++++
.../zeppelin/scalding/ScaldingInterpreter.java | 288 +++++++++++++++++++
.../zeppelin/scalding/ScaldingILoop.scala | 111 +++++++
.../scalding/ScaldingInterpreterTest.java | 130 +++++++++
.../zeppelin/conf/ZeppelinConfiguration.java | 3 +-
14 files changed, 826 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 85fc0b6..9a99b78 100644
--- a/README.md
+++ b/README.md
@@ -153,6 +153,11 @@ mvn clean package -Pspark-1.5 -Pmapr50 -DskipTests
mvn clean package -Dignite.version=1.1.0-incubating -DskipTests
```
+#### Scalding Interpreter
+
+```
+mvn clean package -Pscalding -DskipTests
+```
### Configure
If you wish to configure Zeppelin option (like port number), configure the following files:
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index b6aca75..74fa2e7 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -105,7 +105,7 @@
<property>
<name>zeppelin.interpreters</name>
- <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter</value>
+ <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/_includes/themes/zeppelin/_navigation.html
----------------------------------------------------------------------
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index 2c62282..5b2da51 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -45,6 +45,7 @@
<li><a href="{{BASE_PATH}}/interpreter/lens.html">Lens</a></li>
<li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
<li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, hawq</a></li>
+ <li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Shell</a></li>
<li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Tajo</a></li>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png
new file mode 100644
index 0000000..1131310
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png differ
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png
new file mode 100644
index 0000000..c52f4e3
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png differ
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png
new file mode 100644
index 0000000..bb01025
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/scalding-pie.png differ
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/docs.md
----------------------------------------------------------------------
diff --git a/docs/docs.md b/docs/docs.md
index a2347a8..b70ee58 100644
--- a/docs/docs.md
+++ b/docs/docs.md
@@ -41,6 +41,7 @@ limitations under the License.
* [lens](./interpreter/lens.html)
* [md](./interpreter/markdown.html)
* [postgresql, hawq](./interpreter/postgresql.html)
+* [scalding](./interpreter/scalding.html)
* [sh](./pleasecontribute.html)
* [spark](./interpreter/spark.html)
* [tajo](./pleasecontribute.html)
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/docs/interpreter/scalding.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md
new file mode 100644
index 0000000..40ec8b1
--- /dev/null
+++ b/docs/interpreter/scalding.md
@@ -0,0 +1,78 @@
+---
+layout: page
+title: "Scalding Interpreter"
+description: ""
+group: manual
+---
+{% include JB/setup %}
+
+
+## Scalding Interpreter for Apache Zeppelin
+[Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs.
+
+### Building the Scalding Interpreter
+You have to first build the Scalding interpreter by enable the **scalding** profile as follows:
+
+```
+mvn clean package -Pscalding -DskipTests
+```
+
+### Enabling the Scalding Interpreter
+
+In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**.
+
+ <center>
+ ![Interpreter Binding](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png)
+
+ ![Interpreter Selection](../assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png)
+ </center>
+
+### Configuring the Interpreter
+Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything.
+
+### Testing the Interpreter
+
+In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.
+
+```
+%scalding
+
+import scala.io.Source
+
+// Get the Alice in Wonderland book from gutenberg.org:
+val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines
+val aliceLineNum = alice.zipWithIndex.toList
+val alicePipe = TypedPipe.from(aliceLineNum)
+
+// Now get a list of words for the book:
+val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList }
+
+// Now lets add a count for each word:
+val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) }
+
+// let's sum them for each word:
+val wordCount = aliceWithCount.group.sum
+
+print ("Here are the top 10 words\n")
+val top10 = wordCount
+ .groupAll
+ .sortBy { case (word, count) => -count }
+ .take(10)
+top10.dump
+
+```
+```
+%scalding
+
+val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n")
+print("%table " + table)
+
+```
+
+If you click on the icon for the pie chart, you should be able to see a chart like this:
+![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png)
+
+### Current Status & Future Work
+The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates.
+
+The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!).
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5e492fa..88d38aa 100755
--- a/pom.xml
+++ b/pom.xml
@@ -628,6 +628,13 @@
</profile>
<profile>
+ <id>scalding</id>
+ <modules>
+ <module>scalding</module>
+ </modules>
+ </profile>
+
+ <profile>
<id>build-distr</id>
<activation>
<activeByDefault>false</activeByDefault>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/pom.xml
----------------------------------------------------------------------
diff --git a/scalding/pom.xml b/scalding/pom.xml
new file mode 100644
index 0000000..abc1e2b
--- /dev/null
+++ b/scalding/pom.xml
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.6.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-scalding</artifactId>
+ <packaging>jar</packaging>
+ <version>0.6.0-incubating-SNAPSHOT</version>
+ <name>Zeppelin: Scalding interpreter</name>
+ <url>http://zeppelin.incubator.apache.org</url>
+
+ <properties>
+ <scala.version>2.10.4</scala.version>
+ <hadoop.version>2.3.0</hadoop.version>
+ <scalding.version>0.15.1-RC13</scalding.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>conjars</id>
+ <name>Concurrent Maven Repo</name>
+ <url>http://conjars.org/repo</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>zeppelin-interpreter</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-exec</artifactId>
+ <version>1.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scalding-core_2.10</artifactId>
+ <version>${scalding.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scalding-repl_2.10</artifactId>
+ <version>${scalding.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <!-- Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-artifact</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/scalding</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>${project.version}</version>
+ <type>${project.packaging}</type>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Plugin to compile Scala code -->
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
----------------------------------------------------------------------
diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
new file mode 100644
index 0000000..d43417e
--- /dev/null
+++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Console;
+import scala.Some;
+import scala.None;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+/**
+ * Scalding interpreter for Zeppelin. Based off the Spark interpreter code.
+ *
+ */
+public class ScaldingInterpreter extends Interpreter {
+ Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class);
+
+ public static final List<String> NO_COMPLETION =
+ Collections.unmodifiableList(new ArrayList<String>());
+
+ static {
+ Interpreter.register("scalding", ScaldingInterpreter.class.getName());
+ }
+
+ private ScaldingILoop interpreter;
+ private ByteArrayOutputStream out;
+ private Map<String, Object> binder;
+
+ public ScaldingInterpreter(Properties property) {
+ super(property);
+ out = new ByteArrayOutputStream();
+ }
+
+ @Override
+ public void open() {
+ URL[] urls = getClassloaderUrls();
+
+ // Very nice discussion about how scala compiler handle classpath
+ // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
+
+ /*
+ * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
+ * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
+ * nsc.Settings.classpath.
+ *
+ * >> val settings = new Settings() >> settings.usejavacp.value = true >>
+ * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
+ * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
+ * getClass.getClassLoader >> } >> in.setContextClassLoader()
+ */
+ Settings settings = new Settings();
+
+ // set classpath for scala compiler
+ PathSetting pathSettings = settings.classpath();
+ String classpath = "";
+ List<File> paths = currentClassPath();
+ for (File f : paths) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += f.getAbsolutePath();
+ }
+
+ if (urls != null) {
+ for (URL u : urls) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += u.getFile();
+ }
+ }
+
+ pathSettings.v_$eq(classpath);
+ settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+
+
+ // set classloader for scala compiler
+ settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
+ .getContextClassLoader()));
+ BooleanSetting b = (BooleanSetting) settings.usejavacp();
+ b.v_$eq(true);
+ settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+ /* Scalding interpreter */
+ PrintStream printStream = new PrintStream(out);
+ interpreter = new ScaldingILoop(null, new PrintWriter(out));
+ interpreter.settings_$eq(settings);
+ interpreter.createInterpreter();
+
+ interpreter.intp().
+ interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
+ binder = (Map<String, Object>) getValue("_binder");
+ binder.put("out", printStream);
+ }
+
+ private Object getValue(String name) {
+ Object ret = interpreter.intp().valueOfTerm(name);
+ if (ret instanceof None) {
+ return null;
+ } else if (ret instanceof Some) {
+ return ((Some) ret).get();
+ } else {
+ return ret;
+ }
+ }
+
+ private List<File> currentClassPath() {
+ List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
+ String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+ if (cps != null) {
+ for (String cp : cps) {
+ paths.add(new File(cp));
+ }
+ }
+ return paths;
+ }
+
+ private List<File> classPath(ClassLoader cl) {
+ List<File> paths = new LinkedList<File>();
+ if (cl == null) {
+ return paths;
+ }
+
+ if (cl instanceof URLClassLoader) {
+ URLClassLoader ucl = (URLClassLoader) cl;
+ URL[] urls = ucl.getURLs();
+ if (urls != null) {
+ for (URL url : urls) {
+ paths.add(new File(url.getFile()));
+ }
+ }
+ }
+ return paths;
+ }
+
+ @Override
+ public void close() {
+ interpreter.intp().close();
+ }
+
+
+ @Override
+ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
+ logger.info("Running Scalding command '" + cmd + "'");
+
+ if (cmd == null || cmd.trim().length() == 0) {
+ return new InterpreterResult(Code.SUCCESS);
+ }
+ return interpret(cmd.split("\n"), contextInterpreter);
+ }
+
+ public InterpreterResult interpret(String[] lines, InterpreterContext context) {
+ synchronized (this) {
+ InterpreterResult r = interpretInput(lines);
+ return r;
+ }
+ }
+
+ public InterpreterResult interpretInput(String[] lines) {
+
+ // add print("") to make sure not finishing with comment
+ // see https://github.com/NFLabs/zeppelin/issues/151
+ String[] linesToRun = new String[lines.length + 1];
+ for (int i = 0; i < lines.length; i++) {
+ linesToRun[i] = lines[i];
+ }
+ linesToRun[lines.length] = "print(\"\")";
+
+ Console.setOut((java.io.PrintStream) binder.get("out"));
+ out.reset();
+ Code r = null;
+ String incomplete = "";
+
+ for (int l = 0; l < linesToRun.length; l++) {
+ String s = linesToRun[l];
+ // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
+ if (l + 1 < linesToRun.length) {
+ String nextLine = linesToRun[l + 1].trim();
+ if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) {
+ incomplete += s + "\n";
+ continue;
+ }
+ }
+
+ scala.tools.nsc.interpreter.Results.Result res = null;
+ try {
+ res = interpreter.intp().interpret(incomplete + s);
+ } catch (Exception e) {
+ logger.error("Interpreter exception: ", e);
+ return new InterpreterResult(Code.ERROR, e.getMessage());
+ }
+
+ r = getResultCode(res);
+
+ if (r == Code.ERROR) {
+ Console.flush();
+ return new InterpreterResult(r, out.toString());
+ } else if (r == Code.INCOMPLETE) {
+ incomplete += s + "\n";
+ } else {
+ incomplete = "";
+ }
+ }
+
+ if (r == Code.INCOMPLETE) {
+ return new InterpreterResult(r, "Incomplete expression");
+ } else {
+ Console.flush();
+ return new InterpreterResult(r, out.toString());
+ }
+ }
+
+ private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+ if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+ return Code.SUCCESS;
+ } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+ return Code.INCOMPLETE;
+ } else {
+ return Code.ERROR;
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ // not implemented
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ // fine-grained progress not implemented - return 0
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ ScaldingInterpreter.class.getName() + this.hashCode());
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return NO_COMPLETION;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
new file mode 100644
index 0000000..bd23c49
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding;
+
+import java.io.{BufferedReader, File, FileReader}
+
+import scala.tools.nsc.GenericRunnerSettings
+import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter}
+
+
+/**
+ * A class providing Scalding specific commands for inclusion in the Scalding REPL.
+ * This is currently forked from Scalding, but should eventually make it into Scalding itself:
+ * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala
+ */
+ class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter)
+ extends ILoop(in0, out) {
+ // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
+ // def this() = this(None, new JPrintWriter(Console.out, true))
+
+ settings = new GenericRunnerSettings({ s => echo(s) })
+
+ override def printWelcome() {
+ val fc = Console.YELLOW
+ val wc = Console.RED
+ def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc)
+ echo(fc +
+ " ( \n" +
+ " )\\ ) ( ( \n" +
+ "(()/( ) )\\ )\\ ) ( ( ( \n" +
+ " /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" +
+ "(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc +
+ wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") +
+ wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") +
+ "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" +
+ " |___/ ")
+ }
+
+ /**
+ * Commands specific to the Scalding REPL. To define a new command use one of the following
+ * factory methods:
+ * - `LoopCommand.nullary` for commands that take no arguments
+ * - `LoopCommand.cmd` for commands that take one string argument
+ * - `LoopCommand.varargs` for commands that take multiple string arguments
+ */
+ private val scaldingCommands: List[LoopCommand] = List()
+
+ /**
+ * Change the shell prompt to read scalding>
+ *
+ * @return a prompt string to use for this REPL.
+ */
+ override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET
+
+ private[this] def addImports(ids: String*): IR.Result =
+ if (ids.isEmpty) IR.Success
+ else intp.interpret("import " + ids.mkString(", "))
+
+ /**
+ * Search for files with the given name in all directories from current directory
+ * up to root.
+ */
+ private def findAllUpPath(filename: String): List[File] =
+ Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent)
+ .takeWhile(_ != "/")
+ .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename)))
+ .toList
+
+ /**
+ * Gets the list of commands that this REPL supports.
+ *
+ * @return a list of the command supported by this REPL.
+ */
+ override def commands: List[LoopCommand] = super.commands ++ scaldingCommands
+
+ protected def imports: List[String] = List(
+ "com.twitter.scalding._",
+ "com.twitter.scalding.ReplImplicits._",
+ "com.twitter.scalding.ReplImplicitContext._",
+ "com.twitter.scalding.ReplState._")
+
+ override def createInterpreter() {
+ super.createInterpreter()
+ intp.beQuietDuring {
+ addImports(imports: _*)
+
+ settings match {
+ case s: GenericRunnerSettings =>
+ findAllUpPath(".scalding_repl").reverse.foreach {
+ f => s.loadfiles.appendToValue(f.toString)
+ }
+ case _ => ()
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
new file mode 100644
index 0000000..7a753fa
--- /dev/null
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * Tests for the Scalding interpreter for Zeppelin.
+ *
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ScaldingInterpreterTest {
+ public static ScaldingInterpreter repl;
+ private InterpreterContext context;
+ private File tmpDir;
+
+ @Before
+ public void setUp() throws Exception {
+ tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis());
+ System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo");
+
+ tmpDir.mkdirs();
+
+ if (repl == null) {
+ Properties p = new Properties();
+
+ repl = new ScaldingInterpreter(p);
+ repl.open();
+ }
+
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ context = new InterpreterContext("note", "id", "title", "text",
+ new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
+ intpGroup.getId(), null),
+ new LinkedList<InterpreterContextRunner>());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ delete(tmpDir);
+ repl.close();
+ }
+
+ private void delete(File file) {
+ if (file.isFile()) file.delete();
+ else if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ if (files != null && files.length > 0) {
+ for (File f : files) {
+ delete(f);
+ }
+ }
+ file.delete();
+ }
+ }
+
+ @Test
+ public void testBasicIntp() {
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ repl.interpret("val a = 1\nval b = 2", context).code());
+
+ // when interpret incomplete expression
+ InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
+ assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
+ assertTrue(incomplete.message().length() > 0); // expecting some error
+ // message
+ }
+
+ @Test
+ public void testBasicScalding() {
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ repl.interpret("case class Sale(state: String, name: String, sale: Int)\n" +
+ "val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\", \"A\", 20), Sale(\"VA\", \"B\", 15))\n" +
+ "val salesPipe = TypedPipe.from(salesList)\n" +
+ "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" +
+ " groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" +
+ "results.dump",
+ context).code());
+ }
+
+ @Test
+ public void testNextLineInvocation() {
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
+ }
+
+ @Test
+ public void testEndWithComment() {
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
+ }
+
+ @Test
+ public void testReferencingUndefinedVal() {
+ InterpreterResult result = repl.interpret("def category(min: Int) = {"
+ + " if (0 <= value) \"error\"" + "}", context);
+ assertEquals(Code.ERROR, result.code());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/8fdaaba9/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 87d1c20..ed3b8c0 100755
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -411,7 +411,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.kylin.KylinInterpreter,"
- + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter"),
+ + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
+ + "org.apache.zeppelin.scalding.ScaldingInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),