You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/06/09 16:10:21 UTC

incubator-zeppelin git commit: [ZEPPELIN-840] Scalding interpreter that works in hdfs mode

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 43baa0af4 -> 96dbc6656


[ZEPPELIN-840] Scalding interpreter that works in hdfs mode

### What is this PR for?
Scalding interpreter that works in hdfs mode

### What type of PR is it?
Improvement

### Todos
* [x] - Update documentation.

### What is the Jira issue?
[ZEPPELIN-840](https://issues.apache.org/jira/browse/ZEPPELIN-840)

### How should this be tested?
1. The remote interpreter has to be run in on a system with Hadoop libraries.
1. Run "%scalding mode" and verify that it is Hdfs
1. Run command that create map-reduce job. For example, "TypedPipe.from(TextLine("/user/pwagle/testfile")).filter(x => x == "a").toList"

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes

Author: Prasad Wagle <pw...@twitter.com>

Closes #917 from prasadwagle/ZEPPELIN-840 and squashes the following commits:

e91efd1 [Prasad Wagle] Restore document section on how to build scalding interpreter by enabling scalding profile
a001660 [Prasad Wagle] Restore scalding profile
f59bb55 [Prasad Wagle] Revert scala version change in zeppelin-server/pom.xml
730d5b6 [Prasad Wagle] Remove scalding profile from pom.xml
d74c52c [Prasad Wagle] Remove -Pscalding from .travis.yml PROFILE variable
92bfcca [Prasad Wagle] Fix checkstyle error
a29867d [Prasad Wagle] Update scalding interpreter doc
5db8daf [Prasad Wagle] Make variables final to avoid java 1.7 compiler error, go back to java 1.7
44e5702 [Prasad Wagle] Fix checkstyle error
d2052ca [Prasad Wagle] Re-add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency
301f126 [Prasad Wagle] Change scalding.version to 0.16.1-RC1
a84ea3e [Prasad Wagle] Use oraclejdk8 in travis.yml
9060073 [Prasad Wagle] Move ZeppelinScaldingShell to org.apache.zeppelin.scalding, use java 1.8, scala 2.11
0e43d00 [Prasad Wagle] Add http://maven.twttr.com repository to resolve com.hadoop.gplcompression:hadoop-lzo:jar:0.4.19 dependency
5908935 [Prasad Wagle] Scalding interpreter that works in hdfs mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/96dbc665
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/96dbc665
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/96dbc665

Branch: refs/heads/master
Commit: 96dbc6656be6df5a19fd01af6b147327702e4392
Parents: 43baa0a
Author: Prasad Wagle <pw...@twitter.com>
Authored: Tue Jun 7 07:58:36 2016 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Thu Jun 9 09:11:49 2016 -0700

----------------------------------------------------------------------
 docs/interpreter/scalding.md                    |  86 +++++++-
 scalding/pom.xml                                |  49 ++++-
 .../zeppelin/scalding/ScaldingInterpreter.java  | 219 +++++++++----------
 .../zeppelin/scalding/ScaldingILoop.scala       | 111 ----------
 .../zeppelin/scalding/ZeppelinReplState.scala   |  48 ++++
 .../scalding/ZeppelinScaldingLoop.scala         |  46 ++++
 .../scalding/ZeppelinScaldingShell.scala        |  72 ++++++
 .../scalding/ScaldingInterpreterTest.java       |   3 +-
 8 files changed, 390 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/docs/interpreter/scalding.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md
index 4430312..ec5608b 100644
--- a/docs/interpreter/scalding.md
+++ b/docs/interpreter/scalding.md
@@ -28,10 +28,49 @@ In a notebook, to enable the **Scalding** interpreter, click on the **Gear** ico
 </center>
 
 ### Configuring the Interpreter
-Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything.
+
+Scalding interpreter runs in two modes:
+
+* local
+* hdfs
+
+In the local mode, you can access files on the local server and scalding transformation are done locally.
+
+In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs.
+
+Zeppelin comes with a pre-configured Scalding interpreter in local mode.
+
+To run the scalding interpreter in the hdfs mode you have to do the following:
+
+**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES**
+
+In conf/zeppelin_env.sh, you have to set
+ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath'
+and directories with custom jar files you need for your scalding commands.
+
+**Set arguments to the scalding repl**
+
+The default arguments are: "--local --repl"
+
+For hdfs mode you need to add: "--hdfs --repl"
+
+If you want to add custom jars, you need to add:
+"-libjars directory/*:directory/*"
+
+For reducer estimation, you need to add something like:
+"-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator"
+
+**Set max.open.instances**
+
+If you want to control the maximum number of open interpreters, you have to select "scoped" interpreter for note
+option and set max.open.instances argument.
 
 ### Testing the Interpreter
-In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.
+
+#### Local mode
+
+In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, 
+we will count words (of course!), and plot a graph of the top 10 words in the book.
 
 ```
 %scalding
@@ -71,7 +110,44 @@ print("%table " + table)
 If you click on the icon for the pie chart, you should be able to see a chart like this:
 ![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png)
 
-### Current Status & Future Work
-The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates.
 
-The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!).
+#### HDFS mode
+
+**Test mode**
+
+```
+%scalding
+mode
+```
+This command should print:
+
+```
+res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)
+```
+
+
+**Test HDFS read**
+
+```
+val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
+testfile.dump
+```
+
+This command should print the contents of the hdfs file /user/x/testfile.
+
+**Test map-reduce job**
+
+```
+val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
+val a = testfile.groupAll.size.values
+a.toList
+
+```
+
+This command should create a map reduce job.
+
+### Future Work
+* Better user feedback (hadoop url, progress updates)
+* Ability to cancel jobs
+* Ability to dynamically load jars without restarting the interpreter
+* Multiuser scalability (run scalding interpreters on different servers)

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/pom.xml
----------------------------------------------------------------------
diff --git a/scalding/pom.xml b/scalding/pom.xml
index 2b04f66..a3b3b58 100644
--- a/scalding/pom.xml
+++ b/scalding/pom.xml
@@ -34,9 +34,9 @@
   <url>http://zeppelin.apache.org</url>
 
   <properties>
-    <scala.version>2.10.4</scala.version>
-    <hadoop.version>2.3.0</hadoop.version>
-    <scalding.version>0.15.1-RC13</scalding.version>
+    <scala.version>2.11.8</scala.version>
+    <hadoop.version>2.6.0</hadoop.version>
+    <scalding.version>0.16.1-RC1</scalding.version>
   </properties>
 
   <repositories>
@@ -45,6 +45,11 @@
       <name>Concurrent Maven Repo</name>
       <url>http://conjars.org/repo</url>
     </repository>
+    <repository>
+      <id>twitter</id>
+      <name>Twitter Maven Repo</name>
+      <url>http://maven.twttr.com</url>
+    </repository>
   </repositories>
 
   <dependencies>
@@ -69,13 +74,43 @@
 
     <dependency>
       <groupId>com.twitter</groupId>
-      <artifactId>scalding-core_2.10</artifactId>
+      <artifactId>scalding-core_2.11</artifactId>
+      <version>${scalding.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scalding-args_2.11</artifactId>
+      <version>${scalding.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scalding-date_2.11</artifactId>
+      <version>${scalding.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scalding-commons_2.11</artifactId>
+      <version>${scalding.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scalding-avro_2.11</artifactId>
       <version>${scalding.version}</version>
     </dependency>
 
     <dependency>
       <groupId>com.twitter</groupId>
-      <artifactId>scalding-repl_2.10</artifactId>
+      <artifactId>scalding-parquet_2.11</artifactId>
+      <version>${scalding.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scalding-repl_2.11</artifactId>
       <version>${scalding.version}</version>
     </dependency>
 
@@ -97,12 +132,12 @@
       <version>${scala.version}</version>
     </dependency>
 
-    <!-- Scalding REPL needs org.apache.hadoop.conf.Configuration even in local mode -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
+      <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
+
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
----------------------------------------------------------------------
diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
index e808e70..4542297 100644
--- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
+++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
@@ -17,35 +17,29 @@
 
 package org.apache.zeppelin.scalding;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.net.URL;
-import java.net.URLClassLoader;
-
+import com.twitter.scalding.ScaldingILoop;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Console;
-import scala.Some;
-import scala.None;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
 
 /**
  * Scalding interpreter for Zeppelin. Based off the Spark interpreter code.
@@ -54,16 +48,29 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting;
 public class ScaldingInterpreter extends Interpreter {
   Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class);
 
+  static final String ARGS_STRING = "args.string";
+  static final String ARGS_STRING_DEFAULT = "--local --repl";
+  static final String MAX_OPEN_INSTANCES = "max.open.instances";
+  static final String MAX_OPEN_INSTANCES_DEFAULT = "50";
+
   public static final List<String> NO_COMPLETION = 
     Collections.unmodifiableList(new ArrayList<String>());
 
   static {
-    Interpreter.register("scalding", ScaldingInterpreter.class.getName());
+    Interpreter.register(
+      "scalding",
+      "scalding",
+      ScaldingInterpreter.class.getName(),
+      new InterpreterPropertyBuilder()
+        .add(ARGS_STRING, ARGS_STRING_DEFAULT, "Arguments for scalding REPL")
+        .add(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT,
+                "Maximum number of open interpreter instances")
+        .build());
   }
 
+  static int numOpenInstances = 0;
   private ScaldingILoop interpreter;
   private ByteArrayOutputStream out;
-  private Map<String, Object> binder;
 
   public ScaldingInterpreter(Properties property) {
     super(property);
@@ -72,104 +79,34 @@ public class ScaldingInterpreter extends Interpreter {
 
   @Override
   public void open() {
-    URL[] urls = getClassloaderUrls();
-
-    // Very nice discussion about how scala compiler handle classpath
-    // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
-
-    /*
-     * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
-     * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
-     * nsc.Settings.classpath.
-     *
-     * >> val settings = new Settings() >> settings.usejavacp.value = true >>
-     * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
-     * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
-     * getClass.getClassLoader >> } >> in.setContextClassLoader()
-     */
-    Settings settings = new Settings();
-
-    // set classpath for scala compiler
-    PathSetting pathSettings = settings.classpath();
-    String classpath = "";
-    List<File> paths = currentClassPath();
-    for (File f : paths) {
-      if (classpath.length() > 0) {
-        classpath += File.pathSeparator;
-      }
-      classpath += f.getAbsolutePath();
+    numOpenInstances = numOpenInstances + 1;
+    String maxOpenInstancesStr = property.getProperty(MAX_OPEN_INSTANCES,
+            MAX_OPEN_INSTANCES_DEFAULT);
+    int maxOpenInstances = 50;
+    try {
+      maxOpenInstances = Integer.valueOf(maxOpenInstancesStr);
+    } catch (Exception e) {
+      logger.error("Error reading max.open.instances", e);
     }
-
-    if (urls != null) {
-      for (URL u : urls) {
-        if (classpath.length() > 0) {
-          classpath += File.pathSeparator;
-        }
-        classpath += u.getFile();
-      }
+    logger.info("max.open.instances = {}", maxOpenInstances);
+    if (numOpenInstances > maxOpenInstances) {
+      logger.error("Reached maximum number of open instances");
+      return;
     }
-
-    pathSettings.v_$eq(classpath);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
-
-    // set classloader for scala compiler
-    settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
-        .getContextClassLoader()));
-    BooleanSetting b = (BooleanSetting) settings.usejavacp();
-    b.v_$eq(true);
-    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    /* Scalding interpreter */
-    PrintStream printStream = new PrintStream(out);
-    interpreter = new ScaldingILoop(null, new PrintWriter(out));
-    interpreter.settings_$eq(settings);
-    interpreter.createInterpreter();
-
-    interpreter.intp().
-      interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
-    binder = (Map<String, Object>) getValue("_binder");
-    binder.put("out", printStream);
-  }
-
-  private Object getValue(String name) {
-    Object ret = interpreter.intp().valueOfTerm(name);
-    if (ret instanceof None) {
-      return null;
-    } else if (ret instanceof Some) {
-      return ((Some) ret).get();
+    logger.info("Opening instance {}", numOpenInstances);
+    logger.info("property: {}", property);
+    String argsString = property.getProperty(ARGS_STRING, ARGS_STRING_DEFAULT);
+    String[] args;
+    if (argsString == null) {
+      args = new String[0];
     } else {
-      return ret;
-    }
-  }
-
-  private List<File> currentClassPath() {
-    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
-    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
-    if (cps != null) {
-      for (String cp : cps) {
-        paths.add(new File(cp));
-      }
-    }
-    return paths;
-  }
-
-  private List<File> classPath(ClassLoader cl) {
-    List<File> paths = new LinkedList<File>();
-    if (cl == null) {
-      return paths;
+      args = argsString.split(" ");
     }
+    logger.info("{}", Arrays.toString(args));
 
-    if (cl instanceof URLClassLoader) {
-      URLClassLoader ucl = (URLClassLoader) cl;
-      URL[] urls = ucl.getURLs();
-      if (urls != null) {
-        for (URL url : urls) {
-          paths.add(new File(url.getFile()));
-        }
-      }
-    }
-    return paths;
+    PrintWriter printWriter = new PrintWriter(out, true);
+    interpreter = ZeppelinScaldingShell.getRepl(args, printWriter);
+    interpreter.createInterpreter();
   }
 
   @Override
@@ -180,12 +117,49 @@ public class ScaldingInterpreter extends Interpreter {
 
   @Override
   public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
-    logger.info("Running Scalding command '" + cmd + "'");
-
+    String user = contextInterpreter.getAuthenticationInfo().getUser();
+    logger.info("Running Scalding command: user: {} cmd: '{}'", user, cmd);
+
+    if (interpreter == null) {
+      logger.error(
+        "interpreter == null, open may not have been called because max.open.instances reached");
+      return new InterpreterResult(Code.ERROR,
+        "interpreter == null\n" +
+        "open may not have been called because max.open.instances reached"
+      );
+    }
     if (cmd == null || cmd.trim().length() == 0) {
       return new InterpreterResult(Code.SUCCESS);
     }
-    return interpret(cmd.split("\n"), contextInterpreter);
+    InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR);
+    if (property.getProperty(ARGS_STRING).contains("hdfs")) {
+      UserGroupInformation ugi = null;
+      try {
+        ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+      } catch (IOException e) {
+        logger.error("Error creating UserGroupInformation", e);
+        return new InterpreterResult(Code.ERROR, e.getMessage());
+      }
+      try {
+        // Make variables final to avoid "local variable is accessed from within inner class;
+        // needs to be declared final" exception in JDK7
+        final String cmd1 = cmd;
+        final InterpreterContext contextInterpreter1 = contextInterpreter;
+        PrivilegedExceptionAction<InterpreterResult> action =
+          new PrivilegedExceptionAction<InterpreterResult>() {
+            public InterpreterResult run() throws Exception {
+              return interpret(cmd1.split("\n"), contextInterpreter1);
+            }
+          };
+        interpreterResult = ugi.doAs(action);
+      } catch (Exception e) {
+        logger.error("Error running command with ugi.doAs", e);
+        return new InterpreterResult(Code.ERROR, e.getMessage());
+      }
+    } else {
+      interpreterResult = interpret(cmd.split("\n"), contextInterpreter);
+    }
+    return interpreterResult;
   }
 
   public InterpreterResult interpret(String[] lines, InterpreterContext context) {
@@ -205,8 +179,13 @@ public class ScaldingInterpreter extends Interpreter {
     }
     linesToRun[lines.length] = "print(\"\")";
 
-    Console.setOut((java.io.PrintStream) binder.get("out"));
     out.reset();
+
+    // Moving two lines below from open() to this function.
+    // If they are in open output is incomplete.
+    PrintStream printStream = new PrintStream(out, true);
+    Console.setOut(printStream);
+
     Code r = null;
     String incomplete = "";
     boolean inComment = false;
@@ -261,7 +240,6 @@ public class ScaldingInterpreter extends Interpreter {
         incomplete = "";
       }
     }
-
     if (r == Code.INCOMPLETE) {
       return new InterpreterResult(r, "Incomplete expression");
     } else {
@@ -306,4 +284,5 @@ public class ScaldingInterpreter extends Interpreter {
   public List<String> completion(String buf, int cursor) {
     return NO_COMPLETION;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
deleted file mode 100644
index bd23c49..0000000
--- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scalding;
-
-import java.io.{BufferedReader, File, FileReader}
-
-import scala.tools.nsc.GenericRunnerSettings
-import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter}
-
-
-/**
- * A class providing Scalding specific commands for inclusion in the Scalding REPL.
- * This is currently forked from Scalding, but should eventually make it into Scalding itself:
- * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala
- */
- class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter)
-    extends ILoop(in0, out) {
-  // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
-  // def this() = this(None, new JPrintWriter(Console.out, true))
-
-  settings = new GenericRunnerSettings({ s => echo(s) })
-
-  override def printWelcome() {
-    val fc = Console.YELLOW
-    val wc = Console.RED
-    def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc)
-    echo(fc +
-      " (                                           \n" +
-      " )\\ )            (   (                       \n" +
-      "(()/(         )  )\\  )\\ )  (          (  (   \n" +
-      " /(_)) (   ( /( ((_)(()/( )\\   (     )\\))(  \n" +
-      "(_))   )\\  )( )) _   ((_)(( )  )\\ ) (( ))\\  \n".replaceAll("_", wc + "_" + fc) + wc +
-      wrapFlames("/ __|((_) ((_)_ | |  _| | (_) _(_(( (_()_) \n") +
-      wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\  \n") +
-      "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, |  \n" +
-      "                                    |___/   ")
-  }
-
-  /**
-   * Commands specific to the Scalding REPL. To define a new command use one of the following
-   * factory methods:
-   * - `LoopCommand.nullary` for commands that take no arguments
-   * - `LoopCommand.cmd` for commands that take one string argument
-   * - `LoopCommand.varargs` for commands that take multiple string arguments
-   */
-  private val scaldingCommands: List[LoopCommand] = List()
-
-  /**
-   * Change the shell prompt to read scalding&gt;
-   *
-   * @return a prompt string to use for this REPL.
-   */
-  override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET
-
-  private[this] def addImports(ids: String*): IR.Result =
-    if (ids.isEmpty) IR.Success
-    else intp.interpret("import " + ids.mkString(", "))
-
-  /**
-   * Search for files with the given name in all directories from current directory
-   * up to root.
-   */
-  private def findAllUpPath(filename: String): List[File] =
-    Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent)
-      .takeWhile(_ != "/")
-      .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename)))
-      .toList
-
-  /**
-   * Gets the list of commands that this REPL supports.
-   *
-   * @return a list of the command supported by this REPL.
-   */
-  override def commands: List[LoopCommand] = super.commands ++ scaldingCommands
-
-  protected def imports: List[String] = List(
-    "com.twitter.scalding._",
-    "com.twitter.scalding.ReplImplicits._",
-    "com.twitter.scalding.ReplImplicitContext._",
-    "com.twitter.scalding.ReplState._")
-
-  override def createInterpreter() {
-    super.createInterpreter()
-    intp.beQuietDuring {
-      addImports(imports: _*)
-
-      settings match {
-        case s: GenericRunnerSettings =>
-          findAllUpPath(".scalding_repl").reverse.foreach {
-            f => s.loadfiles.appendToValue(f.toString)
-          }
-        case _ => ()
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
new file mode 100644
index 0000000..b847eba
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding
+
+/**
+  * Stores REPL state
+  */
+
+import cascading.flow.FlowDef
+import com.twitter.scalding.BaseReplState
+import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext }
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+object ZeppelinReplState extends BaseReplState {
+  override def shell = ZeppelinScaldingShell
+}
+
+/**
+  * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly
+  * used everywhere.
+  */
+object ZeppelinReplImplicitContext {
+  /** Implicit execution context for using the Execution monad */
+  implicit val executionContext = ConcurrentExecutionContext.global
+  /** Implicit repl state used for ShellPipes */
+  implicit def stateImpl = ZeppelinReplState
+  /** Implicit flowDef for this Scalding shell session. */
+  implicit def flowDefImpl = ZeppelinReplState.flowDef
+  /** Defaults to running in local mode if no mode is specified. */
+  implicit def modeImpl = ZeppelinReplState.mode
+  implicit def configImpl = ZeppelinReplState.config
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
new file mode 100644
index 0000000..9be0199
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding
+
+import java.io.BufferedReader
+import com.twitter.scalding.ScaldingILoop
+
+import scala.tools.nsc.interpreter._
+
+/**
+  * TBD
+  */
+class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter)
+  extends ScaldingILoop(in, out) {
+
+  override protected def imports = List(
+    "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop, ScaldingShell => ScaldingScaldingShell, _ }",
+    // ReplImplicits minus fields API parts (esp FieldConversions)
+    """com.twitter.scalding.ReplImplicits.{
+      iterableToSource,
+      keyedListLikeToShellTypedPipe,
+      typedPipeToShellTypedPipe,
+      valuePipeToShellValuePipe
+    }""",
+    "com.twitter.scalding.ReplImplicits",
+    "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._",
+    "org.apache.zeppelin.scalding.ZeppelinReplState",
+    "org.apache.zeppelin.scalding.ZeppelinReplState._"
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
----------------------------------------------------------------------
diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
new file mode 100644
index 0000000..29e5f83
--- /dev/null
+++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scalding
+
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TypedPipe
+import scala.tools.nsc.{GenericRunnerCommand}
+import scala.tools.nsc.interpreter._
+
+/**
+  * TBD
+  */
+object ZeppelinScaldingShell extends BaseScaldingShell {
+
+  override def replState = ZeppelinReplState
+
+  def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = {
+
+    val argsExpanded = ExpandLibJarsGlobs(args)
+    val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded)
+
+    // Process command line arguments into a settings object, and use that to start the REPL.
+    // We ignore params we don't care about - hence error function is empty
+    val command = new GenericRunnerCommand(cmdArgs, _ => ())
+
+    // inherit defaults for embedded interpretter (needed for running with SBT)
+    // (TypedPipe chosen arbitrarily, just needs to be something representative)
+    command.settings.embeddedDefaults[TypedPipe[String]]
+
+    // if running from the assembly, need to explicitly tell it to use java classpath
+    if (args.contains("--repl")) command.settings.usejavacp.value = true
+
+    command.settings.classpath.append(System.getProperty("java.class.path"))
+
+    // Force the repl to be synchronous, so all cmds are executed in the same thread
+    command.settings.Yreplsync.value = true
+
+    val repl = new ZeppelinScaldingILoop(None, out)
+    scaldingREPL = Some(repl)
+    replState.mode = mode
+    replState.customConfig = replState.customConfig ++ (mode match {
+      case _: HadoopMode => cfg
+      case _ => Config.empty
+    })
+
+    // if in Hdfs mode, store the mode to enable switching between Local and Hdfs
+    mode match {
+      case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m)
+      case _ => ()
+    }
+
+    repl.settings = command.settings
+    return repl;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/96dbc665/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 08c67da..7ffbd97 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -57,6 +57,7 @@ public class ScaldingInterpreterTest {
 
     if (repl == null) {
       Properties p = new Properties();
+      p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl");
 
       repl = new ScaldingInterpreter(p);
       repl.open();
@@ -119,7 +120,7 @@ public class ScaldingInterpreterTest {
           "val salesPipe = TypedPipe.from(salesList)\n" +
           "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" +
           "    groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" +
-          "results.dump", 
+          "results.dump",
           context).code());
   }