You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/06/09 16:10:21 UTC
incubator-zeppelin git commit: [ZEPPELIN-840] Scalding interpreter
that works in hdfs mode
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master 43baa0af4 -> 96dbc6656
[ZEPPELIN-840] Scalding interpreter that works in hdfs mode
### What is this PR for?
Scalding interpreter that works in hdfs mode
### What type of PR is it?
Improvement
### Todos
* [x] - Update documentation.
### What is the Jira issue?
[ZEPPELIN-840](https://issues.apache.org/jira/browse/ZEPPELIN-840)
### How should this be tested?
1. The remote interpreter has to be run in on a system with Hadoop libraries.
1. Run "%scalding mode" and verify that it is Hdfs
1. Run command that create map-reduce job. For example, "TypedPipe.from(TextLine("/user/pwagle/testfile")).filter(x => x == "a").toList"
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
Author: Prasad Wagle <pw...@twitter.com>
Closes #917 from prasadwagle/ZEPPELIN-840 and squashes the following commits:
e91efd1 [Prasad Wagle] Restore document section on how to build scalding interpreter by enabling scalding profile
a001660 [Prasad Wagle] Restore scalding profile
f59bb55 [Prasad Wagle] Revert scala version change in zeppelin-server/pom.xml
730d5b6 [Prasad Wagle] Remove scalding profile from pom.xml
d74c52c [Prasad Wagle] Remove -Pscalding from .travis.yml PROFILE variable
92bfcca [Prasad Wagle] Fix checkstyle error
a29867d [Prasad Wagle] Update scalding interpreter doc
5db8daf [Prasad Wagle] Make variables final to avoid java 1.7 compiler error, go back to java 1.7
44e5702 [Prasad Wagle] Fix checkstyle error
d2052ca [Prasad Wagle] Re-add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency
301f126 [Prasad Wagle] Change scalding.version to 0.16.1-RC1
a84ea3e [Prasad Wagle] Use oraclejdk8 in travis.yml
9060073 [Prasad Wagle] Move ZeppelinScaldingShell to org.apache.zeppelin.scalding, use java 1.8, scala 2.11
0e43d00 [Prasad Wagle] Add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency
5908935 [Prasad Wagle] Scalding interpreter that works in hdfs mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/96dbc665
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/96dbc665
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/96dbc665
Branch: refs/heads/master
Commit: 96dbc6656be6df5a19fd01af6b147327702e4392
Parents: 43baa0a
Author: Prasad Wagle <pw...@twitter.com>
Authored: Tue Jun 7 07:58:36 2016 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Thu Jun 9 09:11:49 2016 -0700
----------------------------------------------------------------------
docs/interpreter/scalding.md | 86 +++++++-
scalding/pom.xml | 49 ++++-
.../zeppelin/scalding/ScaldingInterpreter.java | 219 +++++++++----------
.../zeppelin/scalding/ScaldingILoop.scala | 111 ----------
.../zeppelin/scalding/ZeppelinReplState.scala | 48 ++++
.../scalding/ZeppelinScaldingLoop.scala | 46 ++++
.../scalding/ZeppelinScaldingShell.scala | 72 ++++++
.../scalding/ScaldingInterpreterTest.java | 3 +-
8 files changed, 390 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/docs/interpreter/scalding.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md
index 4430312..ec5608b 100644
--- a/docs/interpreter/scalding.md
+++ b/docs/interpreter/scalding.md
@@ -28,10 +28,49 @@ In a notebook, to enable the **Scalding** interpreter, click on the **Gear** ico
</center>
### Configuring the Interpreter
-Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything.
+
+Scalding interpreter runs in two modes:
+
+* local
+* hdfs
+
+In the local mode, you can access files on the local server and scalding transformation are done locally.
+
+In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs.
+
+Zeppelin comes with a pre-configured Scalding interpreter in local mode.
+
+To run the scalding interpreter in the hdfs mode you have to do the following:
+
+**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES**
+
+In conf/zeppelin_env.sh, you have to set
+ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath'
+and directories with custom jar files you need for your scalding commands.
+
+**Set arguments to the scalding repl**
+
+The default arguments are: "--local --repl"
+
+For hdfs mode you need to add: "--hdfs --repl"
+
+If you want to add custom jars, you need to add:
+"-libjars directory/*:directory/*"
+
+For reducer estimation, you need to add something like:
+"-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator"
+
+**Set max.open.instances**
+
+If you want to control the maximum number of open interpreters, you have to select "scoped" interpreter for note
+option and set max.open.instances argument.
### Testing the Interpreter
-In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.
+
+#### Local mode
+
+In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial,
+we will count words (of course!), and plot a graph of the top 10 words in the book.
```
%scalding
@@ -71,7 +110,44 @@ print("%table " + table)
If you click on the icon for the pie chart, you should be able to see a chart like this:
![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png)
-### Current Status & Future Work
-The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates.
-The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!).
+#### HDFS mode
+
+**Test mode**
+
+```
+%scalding
+mode
+```
+This command should print:
+
+```
+res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)
+```
+
+
+**Test HDFS read**
+
+```
+val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
+testfile.dump
+```
+
+This command should print the contents of the hdfs file /user/x/testfile.
+
+**Test map-reduce job**
+
+```
+val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
+val a = testfile.groupAll.size.values
+a.toList
+
+```
+
+This command should create a map reduce job.
+
+### Future Work
+* Better user feedback (hadoop url, progress updates)
+* Ability to cancel jobs
+* Ability to dynamically load jars without restarting the interpreter
+* Multiuser scalability (run scalding interpreters on different servers)
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/pom.xml
----------------------------------------------------------------------
diff --git a/scalding/pom.xml b/scalding/pom.xml
index 2b04f66..a3b3b58 100644
--- a/scalding/pom.xml
+++ b/scalding/pom.xml
@@ -34,9 +34,9 @@
<url>http://zeppelin.apache.org</url>
<properties>
- <scala.version>2.10.4</scala.version>
- <hadoop.version>2.3.0</hadoop.version>
- <scalding.version>0.15.1-RC13</scalding.version>
+ <scala.version>2.11.8</scala.version>
+ <hadoop.version>2.6.0</hadoop.version>
+ <scalding.version>0.16.1-RC1</scalding.version>
</properties>
<repositories>
@@ -45,6 +45,11 @@
<name>Concurrent Maven Repo</name>
<url>http://conjars.org/repo</url>
</repository>
+ <repository>
+ <id>twitter</id>
+ <name>Twitter Maven Repo</name>
+ <url>http://maven.twttr.com</url>
+ </repository>
</repositories>
<dependencies>
@@ -69,13 +74,43 @@
<dependency>
<groupId>com.twitter</groupId>
- <artifactId>scalding-core_2.10</artifactId>
+ <artifactId>scalding-core_2.11</artifactId>
+ <version>${scalding.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scalding-args_2.11</artifactId>
+ <version>${scalding.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scalding-date_2.11</artifactId>
+ <version>${scalding.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scalding-commons_2.11</artifactId>
+ <version>${scalding.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scalding-avro_2.11</artifactId>
<version>${scalding.version}</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
- <artifactId>scalding-repl_2.10</artifactId>
+ <artifactId>scalding-parquet_2.11</artifactId>
+ <version>${scalding.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scalding-repl_2.11</artifactId>
<version>${scalding.version}</version>
</dependency>
@@ -97,12 +132,12 @@
<version>${scala.version}</version>
</dependency>
- <!-- Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode -->
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
+ <artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
----------------------------------------------------------------------
diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
index e808e70..4542297 100644
--- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
+++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
@@ -17,35 +17,29 @@
package org.apache.zeppelin.scalding;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.net.URL;
-import java.net.URLClassLoader;
-
+import com.twitter.scalding.ScaldingILoop;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Console;
-import scala.Some;
-import scala.None;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
/**
* Scalding interpreter for Zeppelin. Based off the Spark interpreter code.
@@ -54,16 +48,29 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting;
public class ScaldingInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class);
+ static final String ARGS_STRING = "args.string";
+ static final String ARGS_STRING_DEFAULT = "--local --repl";
+ static final String MAX_OPEN_INSTANCES = "max.open.instances";
+ static final String MAX_OPEN_INSTANCES_DEFAULT = "50";
+
public static final List<String> NO_COMPLETION =
Collections.unmodifiableList(new ArrayList<String>());
static {
- Interpreter.register("scalding", ScaldingInterpreter.class.getName());
+ Interpreter.register(
+ "scalding",
+ "scalding",
+ ScaldingInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add(ARGS_STRING, ARGS_STRING_DEFAULT, "Arguments for scalding REPL")
+ .add(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT,
+ "Maximum number of open interpreter instances")
+ .build());
}
+ static int numOpenInstances = 0;
private ScaldingILoop interpreter;
private ByteArrayOutputStream out;
- private Map<String, Object> binder;
public ScaldingInterpreter(Properties property) {
super(property);
@@ -72,104 +79,34 @@ public class ScaldingInterpreter extends Interpreter {
@Override
public void open() {
- URL[] urls = getClassloaderUrls();
-
- // Very nice discussion about how scala compiler handle classpath
- // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
-
- /*
- * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
- * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
- * nsc.Settings.classpath.
- *
- * >> val settings = new Settings() >> settings.usejavacp.value = true >>
- * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
- * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
- * getClass.getClassLoader >> } >> in.setContextClassLoader()
- */
- Settings settings = new Settings();
-
- // set classpath for scala compiler
- PathSetting pathSettings = settings.classpath();
- String classpath = "";
- List<File> paths = currentClassPath();
- for (File f : paths) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += f.getAbsolutePath();
+ numOpenInstances = numOpenInstances + 1;
+ String maxOpenInstancesStr = property.getProperty(MAX_OPEN_INSTANCES,
+ MAX_OPEN_INSTANCES_DEFAULT);
+ int maxOpenInstances = 50;
+ try {
+ maxOpenInstances = Integer.valueOf(maxOpenInstancesStr);
+ } catch (Exception e) {
+ logger.error("Error reading max.open.instances", e);
}
-
- if (urls != null) {
- for (URL u : urls) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += u.getFile();
- }
+ logger.info("max.open.instances = {}", maxOpenInstances);
+ if (numOpenInstances > maxOpenInstances) {
+ logger.error("Reached maximum number of open instances");
+ return;
}
-
- pathSettings.v_$eq(classpath);
- settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
-
- // set classloader for scala compiler
- settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
- .getContextClassLoader()));
- BooleanSetting b = (BooleanSetting) settings.usejavacp();
- b.v_$eq(true);
- settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
- /* Scalding interpreter */
- PrintStream printStream = new PrintStream(out);
- interpreter = new ScaldingILoop(null, new PrintWriter(out));
- interpreter.settings_$eq(settings);
- interpreter.createInterpreter();
-
- interpreter.intp().
- interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
- binder = (Map<String, Object>) getValue("_binder");
- binder.put("out", printStream);
- }
-
- private Object getValue(String name) {
- Object ret = interpreter.intp().valueOfTerm(name);
- if (ret instanceof None) {
- return null;
- } else if (ret instanceof Some) {
- return ((Some) ret).get();
+ logger.info("Opening instance {}", numOpenInstances);
+ logger.info("property: {}", property);
+ String argsString = property.getProperty(ARGS_STRING, ARGS_STRING_DEFAULT);
+ String[] args;
+ if (argsString == null) {
+ args = new String[0];
} else {
- return ret;
- }
- }
-
- private List<File> currentClassPath() {
- List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
- String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
- if (cps != null) {
- for (String cp : cps) {
- paths.add(new File(cp));
- }
- }
- return paths;
- }
-
- private List<File> classPath(ClassLoader cl) {
- List<File> paths = new LinkedList<File>();
- if (cl == null) {
- return paths;
+ args = argsString.split(" ");
}
+ logger.info("{}", Arrays.toString(args));
- if (cl instanceof URLClassLoader) {
- URLClassLoader ucl = (URLClassLoader) cl;
- URL[] urls = ucl.getURLs();
- if (urls != null) {
- for (URL url : urls) {
- paths.add(new File(url.getFile()));
- }
- }
- }
- return paths;
+ PrintWriter printWriter = new PrintWriter(out, true);
+ interpreter = ZeppelinScaldingShell.getRepl(args, printWriter);
+ interpreter.createInterpreter();
}
@Override
@@ -180,12 +117,49 @@ public class ScaldingInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
- logger.info("Running Scalding command '" + cmd + "'");
-
+ String user = contextInterpreter.getAuthenticationInfo().getUser();
+ logger.info("Running Scalding command: user: {} cmd: '{}'", user, cmd);
+
+ if (interpreter == null) {
+ logger.error(
+ "interpreter == null, open may not have been called because max.open.instances reached");
+ return new InterpreterResult(Code.ERROR,
+ "interpreter == null\n" +
+ "open may not have been called because max.open.instances reached"
+ );
+ }
if (cmd == null || cmd.trim().length() == 0) {
return new InterpreterResult(Code.SUCCESS);
}
- return interpret(cmd.split("\n"), contextInterpreter);
+ InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR);
+ if (property.getProperty(ARGS_STRING).contains("hdfs")) {
+ UserGroupInformation ugi = null;
+ try {
+ ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+ } catch (IOException e) {
+ logger.error("Error creating UserGroupInformation", e);
+ return new InterpreterResult(Code.ERROR, e.getMessage());
+ }
+ try {
+ // Make variables final to avoid "local variable is accessed from within inner class;
+ // needs to be declared final" exception in JDK7
+ final String cmd1 = cmd;
+ final InterpreterContext contextInterpreter1 = contextInterpreter;
+ PrivilegedExceptionAction<InterpreterResult> action =
+ new PrivilegedExceptionAction<InterpreterResult>() {
+ public InterpreterResult run() throws Exception {
+ return interpret(cmd1.split("\n"), contextInterpreter1);
+ }
+ };
+ interpreterResult = ugi.doAs(action);
+ } catch (Exception e) {
+ logger.error("Error running command with ugi.doAs", e);
+ return new InterpreterResult(Code.ERROR, e.getMessage());
+ }
+ } else {
+ interpreterResult = interpret(cmd.split("\n"), contextInterpreter);
+ }
+ return interpreterResult;
}
public InterpreterResult interpret(String[] lines, InterpreterContext context) {
@@ -205,8 +179,13 @@ public class ScaldingInterpreter extends Interpreter {
}
linesToRun[lines.length] = "print(\"\")";
- Console.setOut((java.io.PrintStream) binder.get("out"));
out.reset();
+
+ // Moving two lines below from open() to this function.
+ // If they are in open output is incomplete.
+ PrintStream printStream = new PrintStream(out, true);
+ Console.setOut(printStream);
+
Code r = null;
String incomplete = "";
boolean inComment = false;
@@ -261,7 +240,6 @@ public class ScaldingInterpreter extends Interpreter {
incomplete = "";
}
}
-
if (r == Code.INCOMPLETE) {
return new InterpreterResult(r, "Incomplete expression");
} else {
@@ -306,4 +284,5 @@ public class ScaldingInterpreter extends Interpreter {
public List<String> completion(String buf, int cursor) {
return NO_COMPLETION;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
deleted file mode 100644
index bd23c49..0000000
--- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scalding;
-
-import java.io.{BufferedReader, File, FileReader}
-
-import scala.tools.nsc.GenericRunnerSettings
-import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter}
-
-
-/**
- * A class providing Scalding specific commands for inclusion in the Scalding REPL.
- * This is currently forked from Scalding, but should eventually make it into Scalding itself:
- * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala
- */
- class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter)
- extends ILoop(in0, out) {
- // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
- // def this() = this(None, new JPrintWriter(Console.out, true))
-
- settings = new GenericRunnerSettings({ s => echo(s) })
-
- override def printWelcome() {
- val fc = Console.YELLOW
- val wc = Console.RED
- def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc)
- echo(fc +
- " ( \n" +
- " )\\ ) ( ( \n" +
- "(()/( ) )\\ )\\ ) ( ( ( \n" +
- " /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" +
- "(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc +
- wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") +
- wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") +
- "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" +
- " |___/ ")
- }
-
- /**
- * Commands specific to the Scalding REPL. To define a new command use one of the following
- * factory methods:
- * - `LoopCommand.nullary` for commands that take no arguments
- * - `LoopCommand.cmd` for commands that take one string argument
- * - `LoopCommand.varargs` for commands that take multiple string arguments
- */
- private val scaldingCommands: List[LoopCommand] = List()
-
- /**
- * Change the shell prompt to read scalding>
- *
- * @return a prompt string to use for this REPL.
- */
- override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET
-
- private[this] def addImports(ids: String*): IR.Result =
- if (ids.isEmpty) IR.Success
- else intp.interpret("import " + ids.mkString(", "))
-
- /**
- * Search for files with the given name in all directories from current directory
- * up to root.
- */
- private def findAllUpPath(filename: String): List[File] =
- Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent)
- .takeWhile(_ != "/")
- .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename)))
- .toList
-
- /**
- * Gets the list of commands that this REPL supports.
- *
- * @return a list of the command supported by this REPL.
- */
- override def commands: List[LoopCommand] = super.commands ++ scaldingCommands
-
- protected def imports: List[String] = List(
- "com.twitter.scalding._",
- "com.twitter.scalding.ReplImplicits._",
- "com.twitter.scalding.ReplImplicitContext._",
- "com.twitter.scalding.ReplState._")
-
- override def createInterpreter() {
- super.createInterpreter()
- intp.beQuietDuring {
- addImports(imports: _*)
-
- settings match {
- case s: GenericRunnerSettings =>
- findAllUpPath(".scalding_repl").reverse.foreach {
- f => s.loadfiles.appendToValue(f.toString)
- }
- case _ => ()
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
new file mode 100644
index 0000000..b847eba
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding
+
+/**
+ * Stores REPL state
+ */
+
+import cascading.flow.FlowDef
+import com.twitter.scalding.BaseReplState
+import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext }
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+object ZeppelinReplState extends BaseReplState {
+ override def shell = ZeppelinScaldingShell
+}
+
+/**
+ * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly
+ * used everywhere.
+ */
+object ZeppelinReplImplicitContext {
+ /** Implicit execution context for using the Execution monad */
+ implicit val executionContext = ConcurrentExecutionContext.global
+ /** Implicit repl state used for ShellPipes */
+ implicit def stateImpl = ZeppelinReplState
+ /** Implicit flowDef for this Scalding shell session. */
+ implicit def flowDefImpl = ZeppelinReplState.flowDef
+ /** Defaults to running in local mode if no mode is specified. */
+ implicit def modeImpl = ZeppelinReplState.mode
+ implicit def configImpl = ZeppelinReplState.config
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
new file mode 100644
index 0000000..9be0199
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding
+
+import java.io.BufferedReader
+import com.twitter.scalding.ScaldingILoop
+
+import scala.tools.nsc.interpreter._
+
+/**
+ * TBD
+ */
+class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter)
+ extends ScaldingILoop(in, out) {
+
+ override protected def imports = List(
+ "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop, ScaldingShell => ScaldingScaldingShell, _ }",
+ // ReplImplicits minus fields API parts (esp FieldConversions)
+ """com.twitter.scalding.ReplImplicits.{
+ iterableToSource,
+ keyedListLikeToShellTypedPipe,
+ typedPipeToShellTypedPipe,
+ valuePipeToShellValuePipe
+ }""",
+ "com.twitter.scalding.ReplImplicits",
+ "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._",
+ "org.apache.zeppelin.scalding.ZeppelinReplState",
+ "org.apache.zeppelin.scalding.ZeppelinReplState._"
+ )
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
new file mode 100644
index 0000000..29e5f83
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding
+
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TypedPipe
+import scala.tools.nsc.{GenericRunnerCommand}
+import scala.tools.nsc.interpreter._
+
+/**
+ * TBD
+ */
+object ZeppelinScaldingShell extends BaseScaldingShell {
+
+ override def replState = ZeppelinReplState
+
+ def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = {
+
+ val argsExpanded = ExpandLibJarsGlobs(args)
+ val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded)
+
+ // Process command line arguments into a settings object, and use that to start the REPL.
+ // We ignore params we don't care about - hence error function is empty
+ val command = new GenericRunnerCommand(cmdArgs, _ => ())
+
+ // inherit defaults for embedded interpretter (needed for running with SBT)
+ // (TypedPipe chosen arbitrarily, just needs to be something representative)
+ command.settings.embeddedDefaults[TypedPipe[String]]
+
+ // if running from the assembly, need to explicitly tell it to use java classpath
+ if (args.contains("--repl")) command.settings.usejavacp.value = true
+
+ command.settings.classpath.append(System.getProperty("java.class.path"))
+
+ // Force the repl to be synchronous, so all cmds are executed in the same thread
+ command.settings.Yreplsync.value = true
+
+ val repl = new ZeppelinScaldingILoop(None, out)
+ scaldingREPL = Some(repl)
+ replState.mode = mode
+ replState.customConfig = replState.customConfig ++ (mode match {
+ case _: HadoopMode => cfg
+ case _ => Config.empty
+ })
+
+ // if in Hdfs mode, store the mode to enable switching between Local and Hdfs
+ mode match {
+ case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m)
+ case _ => ()
+ }
+
+ repl.settings = command.settings
+ return repl;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 08c67da..7ffbd97 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -57,6 +57,7 @@ public class ScaldingInterpreterTest {
if (repl == null) {
Properties p = new Properties();
+ p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl");
repl = new ScaldingInterpreter(p);
repl.open();
@@ -119,7 +120,7 @@ public class ScaldingInterpreterTest {
"val salesPipe = TypedPipe.from(salesList)\n" +
"val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" +
" groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" +
- "results.dump",
+ "results.dump",
context).code());
}