You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by emmanuel warreng <em...@gmail.com> on 2022/07/20 17:33:38 UTC
Re: [zeppelin] branch master updated: [ZEPPELIN-5764] Remove Scalding interpreter (#4405)
Unsubscribe
On Fri, Jul 15, 2022, 06:34 <jo...@apache.org> wrote:
> This is an automated email from the ASF dual-hosted git repository.
>
> jongyoul pushed a commit to branch master
> in repository https://gitbox.apache.org/repos/asf/zeppelin.git
>
>
> The following commit(s) were added to refs/heads/master by this push:
> new 11d82c4bda [ZEPPELIN-5764] Remove Scalding interpreter (#4405)
> 11d82c4bda is described below
>
> commit 11d82c4bda72abe276f8c4309f617569c3f4a57c
> Author: 김민수 <al...@gmail.com>
> AuthorDate: Fri Jul 15 13:34:12 2022 +0900
>
> [ZEPPELIN-5764] Remove Scalding interpreter (#4405)
>
> * [ZEPPELIN-5764] Remove Scalding interpreter
>
> * [ZEPPELIN-5764] remove scalding interpreter docs
>
> in how_to_build.md
> remove scalding interpreter command example, scalding keword
> ---
> .github/workflows/core.yml | 2 +-
> conf/interpreter-list | 1 -
> dev/create_release.sh | 2 +-
> docs/_includes/themes/zeppelin/_navigation.html | 1 -
> docs/index.md | 1 -
> docs/interpreter/scalding.md | 168 -------------
> docs/setup/basics/how_to_build.md | 8 +-
> docs/usage/interpreter/installation.md | 16 --
> pom.xml | 1 -
> scalding/pom.xml | 197 ---------------
> .../zeppelin/scalding/ScaldingInterpreter.java | 280
> ---------------------
> .../src/main/resources/interpreter-setting.json | 21 --
> .../zeppelin/scalding/ZeppelinReplState.scala | 48 ----
> .../zeppelin/scalding/ZeppelinScaldingLoop.scala | 46 ----
> .../zeppelin/scalding/ZeppelinScaldingShell.scala | 72 ------
> .../zeppelin/scalding/ScaldingInterpreterTest.java | 144 -----------
> 16 files changed, 3 insertions(+), 1005 deletions(-)
>
> diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
> index 4b3e42a369..db075e1bc2 100644
> --- a/.github/workflows/core.yml
> +++ b/.github/workflows/core.yml
> @@ -82,7 +82,7 @@ jobs:
> interpreter-test-non-core:
> runs-on: ubuntu-20.04
> env:
> - INTERPRETERS:
> 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql,scalding'
> + INTERPRETERS:
> 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql'
> steps:
> - name: Checkout
> uses: actions/checkout@v2
> diff --git a/conf/interpreter-list b/conf/interpreter-list
> index 76584969c0..270d243b5c 100644
> --- a/conf/interpreter-list
> +++ b/conf/interpreter-list
> @@ -39,7 +39,6 @@ neo4j
> org.apache.zeppelin:zeppelin-neo4j:0.10.0 Neo4j i
> pig org.apache.zeppelin:zeppelin-pig:0.10.0
> Pig interpreter
> python org.apache.zeppelin:zeppelin-python:0.10.0
> Python interpreter
> sap org.apache.zeppelin:zeppelin-sap:0.10.0
> SAP Support
> -scalding org.apache.zeppelin:zeppelin-scalding_2.0.10:0.10.0
> Scalding interpreter
> scio org.apache.zeppelin:zeppelin-scio:0.10.0
> Scio interpreter
> shell org.apache.zeppelin:zeppelin-shell:0.10.0
> Shell command
> sparql org.apache.zeppelin:zeppelin-sparql:0.10.0
> Sparql interpreter
> diff --git a/dev/create_release.sh b/dev/create_release.sh
> index a3bef0d1d5..ae2162aa9b 100755
> --- a/dev/create_release.sh
> +++ b/dev/create_release.sh
> @@ -97,7 +97,7 @@ function make_binary_release() {
>
> git_clone
> make_source_package
> -make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl
> !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql,!scalding
> -am"
> +make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl
> !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql
> -am"
> make_binary_release all "-Pweb-angular -Phadoop-2.6"
>
> # remove non release files and dirs
> diff --git a/docs/_includes/themes/zeppelin/_navigation.html
> b/docs/_includes/themes/zeppelin/_navigation.html
> index 205e8fc7fc..ceed569605 100644
> --- a/docs/_includes/themes/zeppelin/_navigation.html
> +++ b/docs/_includes/themes/zeppelin/_navigation.html
> @@ -163,7 +163,6 @@
> <li><a
> href="{{BASE_PATH}}/interpreter/pig.html">Pig</a></li>
> <li><a
> href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, HAWQ</a></li>
> <li><a
> href="{{BASE_PATH}}/interpreter/sap.html">SAP</a></li>
> - <li><a
> href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
> <li><a
> href="{{BASE_PATH}}/interpreter/scio.html">Scio</a></li>
> <li><a
> href="{{BASE_PATH}}/interpreter/shell.html">Shell</a></li>
> <li><a
> href="{{BASE_PATH}}/interpreter/sparql.html">Sparql</a></li>
> diff --git a/docs/index.md b/docs/index.md
> index d955496160..d3e5a461d5 100644
> --- a/docs/index.md
> +++ b/docs/index.md
> @@ -163,7 +163,6 @@ limitations under the License.
> * [Python](./interpreter/python.html)
> * [R](./interpreter/r.html)
> * [SAP](./interpreter/sap.html)
> - * [Scalding](./interpreter/scalding.html)
> * [Scio](./interpreter/scio.html)
> * [Shell](./interpreter/shell.html)
> * [Spark](./interpreter/spark.html)
> diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md
> deleted file mode 100644
> index 1d55e59a38..0000000000
> --- a/docs/interpreter/scalding.md
> +++ /dev/null
> @@ -1,168 +0,0 @@
> ----
> -layout: page
> -title: "Scalding Interpreter for Apache Zeppelin"
> -description: "Scalding is an open source Scala library for writing
> MapReduce jobs."
> -group: interpreter
> ----
> -<!--
> -Licensed under the Apache License, Version 2.0 (the "License");
> -you may not use this file except in compliance with the License.
> -You may obtain a copy of the License at
> -
> -http://www.apache.org/licenses/LICENSE-2.0
> -
> -Unless required by applicable law or agreed to in writing, software
> -distributed under the License is distributed on an "AS IS" BASIS,
> -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> -See the License for the specific language governing permissions and
> -limitations under the License.
> --->
> -{% include JB/setup %}
> -
> -# Scalding Interpreter for Apache Zeppelin
> -
> -<div id="toc"></div>
> -
> -[Scalding](https://github.com/twitter/scalding) is an open source Scala
> library for writing MapReduce jobs.
> -
> -## Building the Scalding Interpreter
> -You have to first build the Scalding interpreter by enable the
> **scalding** profile as follows:
> -
> -```bash
> -./mvnw clean package -Pscalding -DskipTests
> -```
> -
> -## Enabling the Scalding Interpreter
> -In a notebook, to enable the **Scalding** interpreter, click on the
> **Gear** icon,select **Scalding**, and hit **Save**.
> -
> -<center>
> -
> -![Interpreter
> Binding]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/scalding-InterpreterBinding.png)
> -
> -![Interpreter
> Selection]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/scalding-InterpreterSelection.png)
> -
> -</center>
> -
> -## Configuring the Interpreter
> -
> -Scalding interpreter runs in two modes:
> -
> -* local
> -* hdfs
> -
> -In the local mode, you can access files on the local server and scalding
> transformation are done locally.
> -
> -In hdfs mode you can access files in HDFS and scalding transformation are
> run as hadoop map-reduce jobs.
> -
> -Zeppelin comes with a pre-configured Scalding interpreter in local mode.
> -
> -To run the scalding interpreter in the hdfs mode you have to do the
> following:
> -
> -**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES**
> -
> -In conf/zeppelin_env.sh, you have to set
> -ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath'
> -and directories with custom jar files you need for your scalding commands.
> -
> -**Set arguments to the scalding repl**
> -
> -The default arguments are: `--local --repl`
> -
> -For hdfs mode you need to add: `--hdfs --repl`
> -
> -If you want to add custom jars, you need to add: `-libjars
> directory/*:directory/*`
> -
> -For reducer estimation, you need to add something like:
>
> -`-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator`
> -
> -**Set max.open.instances**
> -
> -If you want to control the maximum number of open interpreters, you have
> to select "scoped" interpreter for note
> -option and set `max.open.instances` argument.
> -
> -## Testing the Interpreter
> -
> -### Local mode
> -
> -In example, by using the [Alice in Wonderland](
> https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial,
> -we will count words (of course!), and plot a graph of the top 10 words in
> the book.
> -
> -```scala
> -%scalding
> -
> -import scala.io.Source
> -
> -// Get the Alice in Wonderland book from gutenberg.org:
> -val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt
> ").getLines
> -val aliceLineNum = alice.zipWithIndex.toList
> -val alicePipe = TypedPipe.from(aliceLineNum)
> -
> -// Now get a list of words for the book:
> -val aliceWords = alicePipe.flatMap { case (text, _) =>
> text.split("\\s+").toList }
> -
> -// Now lets add a count for each word:
> -val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word =>
> (word, 1L) }
> -
> -// let's sum them for each word:
> -val wordCount = aliceWithCount.group.sum
> -
> -print ("Here are the top 10 words\n")
> -val top10 = wordCount
> - .groupAll
> - .sortBy { case (word, count) => -count }
> - .take(10)
> -top10.dump
> -
> -```
> -```
> -%scalding
> -
> -val table = "words\t count\n" + top10.toIterator.map{case (k, (word,
> count)) => s"$word\t$count"}.mkString("\n")
> -print("%table " + table)
> -
> -```
> -
> -If you click on the icon for the pie chart, you should be able to see a
> chart like this:
> -![Scalding - Pie -
> Chart]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/scalding-pie.png)
> -
> -
> -### HDFS mode
> -
> -**Test mode**
> -
> -```
> -%scalding
> -mode
> -```
> -This command should print:
> -
> -```
> -res4: com.twitter.scalding.Mode = Hdfs(true,Configuration:
> core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
> yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)
> -```
> -
> -
> -**Test HDFS read**
> -
> -```scala
> -val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
> -testfile.dump
> -```
> -
> -This command should print the contents of the hdfs file /user/x/testfile.
> -
> -**Test map-reduce job**
> -
> -```scala
> -val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
> -val a = testfile.groupAll.size.values
> -a.toList
> -
> -```
> -
> -This command should create a map reduce job.
> -
> -## Future Work
> -* Better user feedback (hadoop url, progress updates)
> -* Ability to cancel jobs
> -* Ability to dynamically load jars without restarting the interpreter
> -* Multiuser scalability (run scalding interpreters on different servers)
> diff --git a/docs/setup/basics/how_to_build.md
> b/docs/setup/basics/how_to_build.md
> index 56715a2fec..df5620af29 100644
> --- a/docs/setup/basics/how_to_build.md
> +++ b/docs/setup/basics/how_to_build.md
> @@ -82,7 +82,7 @@ You can directly start Zeppelin by running the following
> command after successfu
>
> #### Scala profile
>
> -To be noticed, this scala profile affect the modules (e.g. cassandra,
> scalding) that use scala except Spark interpreter (Spark interpreter use
> other profiles to control its scala version, see the doc below).
> +To be noticed, this scala profile affect the modules (e.g. cassandra)
> that use scala except Spark interpreter (Spark interpreter use other
> profiles to control its scala version, see the doc below).
>
> Set scala version (default 2.10). Available profiles are
>
> @@ -170,12 +170,6 @@ Ignite Interpreter
> ./mvnw clean package -Dignite.version=1.9.0 -DskipTests
> ```
>
> -Scalding Interpreter
> -
> -```bash
> -./mvnw clean package -Pscalding -DskipTests
> -```
> -
> ### Optional configurations
>
> Here are additional configurations that could be optionally tuned using
> the trailing `-D` option for maven commands
> diff --git a/docs/usage/interpreter/installation.md
> b/docs/usage/interpreter/installation.md
> index 2e26b19896..aaea7b8ebc 100644
> --- a/docs/usage/interpreter/installation.md
> +++ b/docs/usage/interpreter/installation.md
> @@ -61,19 +61,8 @@ Zeppelin support both Scala 2.10 and 2.11 for several
> interpreters as below:
> <td>org.apache.zeppelin:zeppelin-spark_2.10:0.10.0</td>
> <td>org.apache.zeppelin:zeppelin-spark_2.11:0.10.0</td>
> </tr>
> - <tr>
> - <td>scalding</td>
> - <td>org.apache.zeppelin:zeppelin-scalding_2.10:0.10.0</td>
> - <td>org.apache.zeppelin:zeppelin-scalding_2.11:0.10.0</td>
> - </tr>
> </table>
>
> -If you install one of these interpreters only with `--name` option,
> installer will download interpreter built with Scala 2.11 by default. If
> you want to specify Scala version, you will need to add `--artifact`
> option. Here is the example of installing flink interpreter built with
> Scala 2.10.
> -
> -```bash
> -./bin/install-interpreter.sh --name flink --artifact
> org.apache.zeppelin:zeppelin-scalding_2.10:0.10.0
> -```
> -
> #### Install Spark interpreter built with Scala 2.10
>
> Spark distribution package has been built with Scala 2.10 until 1.6.2. If
> you have `SPARK_HOME` set pointing to Spark version earlier than 2.0.0, you
> need to download Spark interpreter packaged with Scala 2.10. To do so, use
> follow command:
> @@ -223,11 +212,6 @@ You can also find the below community managed
> interpreter list in `conf/interpre
> <td>org.apache.zeppelin:zeppelin-sap:0.10.0</td>
> <td>SAP support</td>
> </tr>
> - <tr>
> - <td>scalding</td>
> - <td>org.apache.zeppelin:zeppelin-scalding_2.0.10:0.10.0</td>
> - <td>Scalding interpreter</td>
> - </tr>
> <tr>
> <td>scio</td>
> <td>org.apache.zeppelin:zeppelin-scio:0.10.0</td>
> diff --git a/pom.xml b/pom.xml
> index da5b999307..b4c1814c9a 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -86,7 +86,6 @@
> <module>scio</module>
> <module>neo4j</module>
> <module>sap</module>
> - <module>scalding</module>
> <module>java</module>
> <module>beam</module>
> <module>hazelcastjet</module>
> diff --git a/scalding/pom.xml b/scalding/pom.xml
> deleted file mode 100644
> index 54beb0b9d4..0000000000
> --- a/scalding/pom.xml
> +++ /dev/null
> @@ -1,197 +0,0 @@
> -<?xml version="1.0" encoding="UTF-8"?>
> -<!--
> - ~ Licensed to the Apache Software Foundation (ASF) under one or more
> - ~ contributor license agreements. See the NOTICE file distributed with
> - ~ this work for additional information regarding copyright ownership.
> - ~ The ASF licenses this file to You under the Apache License, Version
> 2.0
> - ~ (the "License"); you may not use this file except in compliance with
> - ~ the License. You may obtain a copy of the License at
> - ~
> - ~ http://www.apache.org/licenses/LICENSE-2.0
> - ~
> - ~ Unless required by applicable law or agreed to in writing, software
> - ~ distributed under the License is distributed on an "AS IS" BASIS,
> - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - ~ See the License for the specific language governing permissions and
> - ~ limitations under the License.
> - -->
> -
> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance"
> - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> https://maven.apache.org/xsd/maven-4.0.0.xsd">
> - <modelVersion>4.0.0</modelVersion>
> -
> - <parent>
> - <artifactId>zeppelin-interpreter-parent</artifactId>
> - <groupId>org.apache.zeppelin</groupId>
> - <version>0.11.0-SNAPSHOT</version>
> - <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
> - </parent>
> -
> - <artifactId>zeppelin-scalding_2.10</artifactId>
> - <packaging>jar</packaging>
> - <name>Zeppelin: Scalding interpreter</name>
> -
> - <properties>
> - <interpreter.name>scalding</interpreter.name>
> - <!--library versions-->
> - <hadoop.version>${hadoop2.6.version}</hadoop.version>
> - <scalding.version>0.16.1-RC1</scalding.version>
> -
> - <!--plugin versions-->
> - <plugin.scala.version>2.15.2</plugin.scala.version>
> - </properties>
> -
> - <repositories>
> - <repository>
> - <id>conjars</id>
> - <name>Concurrent Maven Repo</name>
> - <url>https://conjars.org/repo</url>
> - </repository>
> -
> - <!-- the twitter repo is unreliable (
> https://github.com/twitter/hadoop-lzo/issues/148) -->
> - <repository>
> - <id>twitter</id>
> - <name>Twitter Maven Repo</name>
> - <url>https://maven.twttr.com</url>
> - </repository>
> -
> - <!-- Temporary repo -->
> - <repository>
> - <id>zeppelin-dependencies</id>
> - <name>bintray</name>
> - <url>https://jetbrains.bintray.com/zeppelin-dependencies</url>
> - </repository>
> - </repositories>
> -
> - <dependencies>
> -
> - <dependency>
> - <groupId>org.apache.commons</groupId>
> - <artifactId>commons-exec</artifactId>
> - <version>${commons.exec.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>com.twitter</groupId>
> - <artifactId>scalding-core_${scala.binary.version}</artifactId>
> - <version>${scalding.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>com.twitter</groupId>
> - <artifactId>scalding-args_${scala.binary.version}</artifactId>
> - <version>${scalding.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>com.twitter</groupId>
> - <artifactId>scalding-date_${scala.binary.version}</artifactId>
> - <version>${scalding.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>com.twitter</groupId>
> - <artifactId>scalding-commons_${scala.binary.version}</artifactId>
> - <version>${scalding.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>com.twitter</groupId>
> - <artifactId>scalding-avro_${scala.binary.version}</artifactId>
> - <version>${scalding.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>com.twitter</groupId>
> - <artifactId>scalding-parquet_${scala.binary.version}</artifactId>
> - <version>${scalding.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>com.twitter</groupId>
> - <artifactId>scalding-repl_${scala.binary.version}</artifactId>
> - <version>${scalding.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>org.scala-lang</groupId>
> - <artifactId>scala-library</artifactId>
> - <version>${scala.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>org.scala-lang</groupId>
> - <artifactId>scala-compiler</artifactId>
> - <version>${scala.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>org.scala-lang</groupId>
> - <artifactId>scala-reflect</artifactId>
> - <version>${scala.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>org.apache.hadoop</groupId>
> - <artifactId>hadoop-client</artifactId>
> - <version>${hadoop.version}</version>
> - </dependency>
> -
> - <dependency>
> - <groupId>org.apache.hadoop</groupId>
> - <artifactId>hadoop-common</artifactId>
> - <version>${hadoop.version}</version>
> - </dependency>
> -
> - </dependencies>
> -
> - <build>
> - <plugins>
> - <plugin>
> - <artifactId>maven-enforcer-plugin</artifactId>
> - </plugin>
> - <plugin>
> - <artifactId>maven-resources-plugin</artifactId>
> - </plugin>
> - <plugin>
> - <artifactId>maven-shade-plugin</artifactId>
> - </plugin>
> - <!-- Plugin to compile Scala code -->
> - <plugin>
> - <groupId>org.scala-tools</groupId>
> - <artifactId>maven-scala-plugin</artifactId>
> - <executions>
> - <execution>
> - <id>compile</id>
> - <goals>
> - <goal>compile</goal>
> - </goals>
> - <phase>compile</phase>
> - </execution>
> - <execution>
> - <id>test-compile</id>
> - <goals>
> - <goal>testCompile</goal>
> - </goals>
> - <phase>test-compile</phase>
> - </execution>
> - <execution>
> - <phase>process-resources</phase>
> - <goals>
> - <goal>compile</goal>
> - </goals>
> - </execution>
> - </executions>
> - </plugin>
> -
> - <plugin>
> - <groupId>org.apache.maven.plugins</groupId>
> - <artifactId>maven-checkstyle-plugin</artifactId>
> - <configuration>
> - <skip>false</skip>
> - </configuration>
> - </plugin>
> - </plugins>
> - </build>
> -
> -</project>
> diff --git
> a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
> b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
> deleted file mode 100644
> index f104a587bb..0000000000
> ---
> a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java
> +++ /dev/null
> @@ -1,280 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements. See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to You under the Apache License, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.zeppelin.scalding;
> -
> -import org.apache.hadoop.security.UserGroupInformation;
> -import org.slf4j.Logger;
> -import org.slf4j.LoggerFactory;
> -
> -import java.io.ByteArrayOutputStream;
> -import java.io.IOException;
> -import java.io.PrintStream;
> -import java.io.PrintWriter;
> -import java.security.PrivilegedExceptionAction;
> -import java.util.ArrayList;
> -import java.util.Arrays;
> -import java.util.Collections;
> -import java.util.List;
> -import java.util.Properties;
> -
> -import com.twitter.scalding.ScaldingILoop;
> -
> -import scala.Console;
> -
> -import org.apache.zeppelin.interpreter.Interpreter;
> -import org.apache.zeppelin.interpreter.InterpreterContext;
> -import org.apache.zeppelin.interpreter.InterpreterResult;
> -import org.apache.zeppelin.interpreter.InterpreterResult.Code;
> -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
> -import org.apache.zeppelin.scheduler.Scheduler;
> -import org.apache.zeppelin.scheduler.SchedulerFactory;
> -
> -/**
> - * Scalding interpreter for Zeppelin. Based off the Spark interpreter
> code.
> - *
> - */
> -public class ScaldingInterpreter extends Interpreter {
> - public static final Logger LOGGER =
> LoggerFactory.getLogger(ScaldingInterpreter.class);
> -
> - static final String ARGS_STRING = "args.string";
> - static final String ARGS_STRING_DEFAULT = "--local --repl";
> - static final String MAX_OPEN_INSTANCES = "max.open.instances";
> - static final String MAX_OPEN_INSTANCES_DEFAULT = "50";
> -
> - public static final List NO_COMPLETION =
> Collections.unmodifiableList(new ArrayList<>());
> -
> - static int numOpenInstances = 0;
> - private ScaldingILoop interpreter;
> - private ByteArrayOutputStream out;
> -
> - public ScaldingInterpreter(Properties property) {
> - super(property);
> - out = new ByteArrayOutputStream();
> - }
> -
> - @Override
> - public void open() {
> - numOpenInstances = numOpenInstances + 1;
> - String maxOpenInstancesStr = getProperty(MAX_OPEN_INSTANCES,
> - MAX_OPEN_INSTANCES_DEFAULT);
> - int maxOpenInstances = 50;
> - try {
> - maxOpenInstances = Integer.valueOf(maxOpenInstancesStr);
> - } catch (Exception e) {
> - LOGGER.error("Error reading max.open.instances", e);
> - }
> - LOGGER.info("max.open.instances = {}", maxOpenInstances);
> - if (numOpenInstances > maxOpenInstances) {
> - LOGGER.error("Reached maximum number of open instances");
> - return;
> - }
> - LOGGER.info("Opening instance {}", numOpenInstances);
> - LOGGER.info("property: {}", getProperties());
> - String argsString = getProperty(ARGS_STRING, ARGS_STRING_DEFAULT);
> - String[] args;
> - if (argsString == null) {
> - args = new String[0];
> - } else {
> - args = argsString.split(" ");
> - }
> - LOGGER.info("{}", Arrays.toString(args));
> -
> - PrintWriter printWriter = new PrintWriter(out, true);
> - interpreter = ZeppelinScaldingShell.getRepl(args, printWriter);
> - interpreter.createInterpreter();
> - }
> -
> - @Override
> - public void close() {
> - interpreter.intp().close();
> - }
> -
> -
> - @Override
> - public InterpreterResult interpret(String cmd, InterpreterContext
> contextInterpreter) {
> - String user = contextInterpreter.getAuthenticationInfo().getUser();
> - LOGGER.info("Running Scalding command: user: {} cmd: '{}'", user,
> cmd);
> -
> - if (interpreter == null) {
> - LOGGER.error(
> - "interpreter == null, open may not have been called because
> max.open.instances reached");
> - return new InterpreterResult(Code.ERROR,
> - "interpreter == null\n" +
> - "open may not have been called because max.open.instances reached"
> - );
> - }
> - if (cmd == null || cmd.trim().length() == 0) {
> - return new InterpreterResult(Code.SUCCESS);
> - }
> - InterpreterResult interpreterResult = new
> InterpreterResult(Code.ERROR);
> - if (getProperty(ARGS_STRING).contains("hdfs")) {
> - UserGroupInformation ugi = null;
> - try {
> - ugi = UserGroupInformation.createProxyUser(user,
> UserGroupInformation.getLoginUser());
> - } catch (IOException e) {
> - LOGGER.error("Error creating UserGroupInformation", e);
> - return new InterpreterResult(Code.ERROR, e.getMessage());
> - }
> - try {
> - // Make variables final to avoid "local variable is accessed from
> within inner class;
> - // needs to be declared final" exception in JDK7
> - final String cmd1 = cmd;
> - final InterpreterContext contextInterpreter1 = contextInterpreter;
> - PrivilegedExceptionAction<InterpreterResult> action =
> - new PrivilegedExceptionAction<InterpreterResult>() {
> - @Override
> - public InterpreterResult run() throws Exception {
> - return interpret(cmd1.split("\n"), contextInterpreter1);
> - }
> - };
> - interpreterResult = ugi.doAs(action);
> - } catch (Exception e) {
> - LOGGER.error("Error running command with ugi.doAs", e);
> - return new InterpreterResult(Code.ERROR, e.getMessage());
> - }
> - } else {
> - interpreterResult = interpret(cmd.split("\n"), contextInterpreter);
> - }
> - return interpreterResult;
> - }
> -
> - public InterpreterResult interpret(String[] lines, InterpreterContext
> context) {
> - synchronized (this) {
> - InterpreterResult r = interpretInput(lines);
> - return r;
> - }
> - }
> -
> - public InterpreterResult interpretInput(String[] lines) {
> -
> - // add print("") to make sure not finishing with comment
> - // see https://github.com/NFLabs/zeppelin/issues/151
> - String[] linesToRun = new String[lines.length + 1];
> - for (int i = 0; i < lines.length; i++) {
> - linesToRun[i] = lines[i];
> - }
> - linesToRun[lines.length] = "print(\"\")";
> -
> - out.reset();
> -
> - // Moving two lines below from open() to this function.
> - // If they are in open output is incomplete.
> - PrintStream printStream = new PrintStream(out, true);
> - Console.setOut(printStream);
> -
> - Code r = null;
> - String incomplete = "";
> - boolean inComment = false;
> -
> - for (int l = 0; l < linesToRun.length; l++) {
> - String s = linesToRun[l];
> - // check if next line starts with "." (but not ".." or "./") it is
> treated as an invocation
> - if (l + 1 < linesToRun.length) {
> - String nextLine = linesToRun[l + 1].trim();
> - boolean continuation = false;
> - if (nextLine.isEmpty()
> - || nextLine.startsWith("//") // skip empty line
> or comment
> - || nextLine.startsWith("}")
> - || nextLine.startsWith("object")) { // include "} object"
> for Scala companion object
> - continuation = true;
> - } else if (!inComment && nextLine.startsWith("/*")) {
> - inComment = true;
> - continuation = true;
> - } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
> - inComment = false;
> - continuation = true;
> - } else if (nextLine.length() > 1
> - && nextLine.charAt(0) == '.'
> - && nextLine.charAt(1) != '.' // ".."
> - && nextLine.charAt(1) != '/') { // "./"
> - continuation = true;
> - } else if (inComment) {
> - continuation = true;
> - }
> - if (continuation) {
> - incomplete += s + "\n";
> - continue;
> - }
> - }
> -
> - scala.tools.nsc.interpreter.Results.Result res = null;
> - try {
> - res = interpreter.intp().interpret(incomplete + s);
> - } catch (Exception e) {
> - LOGGER.error("Interpreter exception: ", e);
> - return new InterpreterResult(Code.ERROR, e.getMessage());
> - }
> -
> - r = getResultCode(res);
> -
> - if (r == Code.ERROR) {
> - Console.flush();
> - return new InterpreterResult(r, out.toString());
> - } else if (r == Code.INCOMPLETE) {
> - incomplete += s + "\n";
> - } else {
> - incomplete = "";
> - }
> - }
> - if (r == Code.INCOMPLETE) {
> - return new InterpreterResult(r, "Incomplete expression");
> - } else {
> - Console.flush();
> - return new InterpreterResult(r, out.toString());
> - }
> - }
> -
> - private Code getResultCode(scala.tools.nsc.interpreter.Results.Result
> r) {
> - if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
> - return Code.SUCCESS;
> - } else if (r instanceof
> scala.tools.nsc.interpreter.Results.Incomplete$) {
> - return Code.INCOMPLETE;
> - } else {
> - return Code.ERROR;
> - }
> - }
> -
> - @Override
> - public void cancel(InterpreterContext context) {
> - // not implemented
> - }
> -
> - @Override
> - public FormType getFormType() {
> - return FormType.NATIVE;
> - }
> -
> - @Override
> - public int getProgress(InterpreterContext context) {
> - // fine-grained progress not implemented - return 0
> - return 0;
> - }
> -
> - @Override
> - public Scheduler getScheduler() {
> - return SchedulerFactory.singleton().createOrGetFIFOScheduler(
> - ScaldingInterpreter.class.getName() + this.hashCode());
> - }
> -
> - @Override
> - public List<InterpreterCompletion> completion(String buf, int cursor,
> - InterpreterContext interpreterContext) {
> - return NO_COMPLETION;
> - }
> -
> -}
> diff --git a/scalding/src/main/resources/interpreter-setting.json
> b/scalding/src/main/resources/interpreter-setting.json
> deleted file mode 100644
> index ca6cd9295a..0000000000
> --- a/scalding/src/main/resources/interpreter-setting.json
> +++ /dev/null
> @@ -1,21 +0,0 @@
> -[
> - {
> - "group": "scalding",
> - "name": "scalding",
> - "className": "org.apache.zeppelin.scalding.ScaldingInterpreter",
> - "properties": {
> - "args.string": {
> - "envName": null,
> - "defaultValue": "--local --repl",
> - "description": "Arguments for scalding REPL",
> - "type": "textarea"
> - },
> - "max.open.instances": {
> - "envName": null,
> - "defaultValue": "50",
> - "description": "Maximum number of open interpreter instances",
> - "type": "number"
> - }
> - }
> - }
> -]
> diff --git
> a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
> b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
> deleted file mode 100644
> index b847eba001..0000000000
> ---
> a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala
> +++ /dev/null
> @@ -1,48 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements. See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to You under the Apache License, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.zeppelin.scalding
> -
> -/**
> - * Stores REPL state
> - */
> -
> -import cascading.flow.FlowDef
> -import com.twitter.scalding.BaseReplState
> -import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext }
> -import scala.concurrent.Future
> -import scala.util.{Failure, Success}
> -
> -object ZeppelinReplState extends BaseReplState {
> - override def shell = ZeppelinScaldingShell
> -}
> -
> -/**
> - * Implicit FlowDef and Mode, import in the REPL to have the global
> context implicitly
> - * used everywhere.
> - */
> -object ZeppelinReplImplicitContext {
> - /** Implicit execution context for using the Execution monad */
> - implicit val executionContext = ConcurrentExecutionContext.global
> - /** Implicit repl state used for ShellPipes */
> - implicit def stateImpl = ZeppelinReplState
> - /** Implicit flowDef for this Scalding shell session. */
> - implicit def flowDefImpl = ZeppelinReplState.flowDef
> - /** Defaults to running in local mode if no mode is specified. */
> - implicit def modeImpl = ZeppelinReplState.mode
> - implicit def configImpl = ZeppelinReplState.config
> -}
> diff --git
> a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
> b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
> deleted file mode 100644
> index 9be0199869..0000000000
> ---
> a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala
> +++ /dev/null
> @@ -1,46 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements. See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to You under the Apache License, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.zeppelin.scalding
> -
> -import java.io.BufferedReader
> -import com.twitter.scalding.ScaldingILoop
> -
> -import scala.tools.nsc.interpreter._
> -
> -/**
> - * TBD
> - */
> -class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter)
> - extends ScaldingILoop(in, out) {
> -
> - override protected def imports = List(
> - "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop,
> ScaldingShell => ScaldingScaldingShell, _ }",
> - // ReplImplicits minus fields API parts (esp FieldConversions)
> - """com.twitter.scalding.ReplImplicits.{
> - iterableToSource,
> - keyedListLikeToShellTypedPipe,
> - typedPipeToShellTypedPipe,
> - valuePipeToShellValuePipe
> - }""",
> - "com.twitter.scalding.ReplImplicits",
> - "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._",
> - "org.apache.zeppelin.scalding.ZeppelinReplState",
> - "org.apache.zeppelin.scalding.ZeppelinReplState._"
> - )
> -
> -}
> diff --git
> a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
> b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
> deleted file mode 100644
> index 29e5f835cb..0000000000
> ---
> a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala
> +++ /dev/null
> @@ -1,72 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements. See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to You under the Apache License, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.zeppelin.scalding
> -
> -import com.twitter.scalding._
> -import com.twitter.scalding.typed.TypedPipe
> -import scala.tools.nsc.{GenericRunnerCommand}
> -import scala.tools.nsc.interpreter._
> -
> -/**
> - * TBD
> - */
> -object ZeppelinScaldingShell extends BaseScaldingShell {
> -
> - override def replState = ZeppelinReplState
> -
> - def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = {
> -
> - val argsExpanded = ExpandLibJarsGlobs(args)
> - val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded)
> -
> - // Process command line arguments into a settings object, and use
> that to start the REPL.
> - // We ignore params we don't care about - hence error function is
> empty
> - val command = new GenericRunnerCommand(cmdArgs, _ => ())
> -
> - // inherit defaults for embedded interpretter (needed for running
> with SBT)
> - // (TypedPipe chosen arbitrarily, just needs to be something
> representative)
> - command.settings.embeddedDefaults[TypedPipe[String]]
> -
> - // if running from the assembly, need to explicitly tell it to use
> java classpath
> - if (args.contains("--repl")) command.settings.usejavacp.value = true
> -
> -
> command.settings.classpath.append(System.getProperty("java.class.path"))
> -
> - // Force the repl to be synchronous, so all cmds are executed in the
> same thread
> - command.settings.Yreplsync.value = true
> -
> - val repl = new ZeppelinScaldingILoop(None, out)
> - scaldingREPL = Some(repl)
> - replState.mode = mode
> - replState.customConfig = replState.customConfig ++ (mode match {
> - case _: HadoopMode => cfg
> - case _ => Config.empty
> - })
> -
> - // if in Hdfs mode, store the mode to enable switching between Local
> and Hdfs
> - mode match {
> - case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m)
> - case _ => ()
> - }
> -
> - repl.settings = command.settings
> - return repl;
> -
> - }
> -
> -}
> diff --git
> a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
> b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
> deleted file mode 100644
> index 992c15594f..0000000000
> ---
> a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
> +++ /dev/null
> @@ -1,144 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one or more
> - * contributor license agreements. See the NOTICE file distributed with
> - * this work for additional information regarding copyright ownership.
> - * The ASF licenses this file to You under the Apache License, Version 2.0
> - * (the "License"); you may not use this file except in compliance with
> - * the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.zeppelin.scalding;
> -
> -import org.apache.zeppelin.interpreter.InterpreterContext;
> -import org.apache.zeppelin.interpreter.InterpreterResult;
> -import org.apache.zeppelin.interpreter.InterpreterResult.Code;
> -import org.apache.zeppelin.user.AuthenticationInfo;
> -import org.junit.After;
> -import org.junit.Before;
> -import org.junit.FixMethodOrder;
> -import org.junit.Test;
> -import org.junit.runners.MethodSorters;
> -
> -import java.io.File;
> -import java.util.Properties;
> -
> -import static org.junit.Assert.assertEquals;
> -import static org.junit.Assert.assertTrue;
> -
> -/**
> - * Tests for the Scalding interpreter for Zeppelin.
> - *
> - */
> -@FixMethodOrder(MethodSorters.NAME_ASCENDING)
> -public class ScaldingInterpreterTest {
> - public static ScaldingInterpreter repl;
> - private InterpreterContext context;
> - private File tmpDir;
> -
> - @Before
> - public void setUp() throws Exception {
> - tmpDir = new File(System.getProperty("java.io.tmpdir") +
> "/ZeppelinLTest_" +
> - System.currentTimeMillis());
> - System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath()
> + "/local-repo");
> -
> - tmpDir.mkdirs();
> -
> - if (repl == null) {
> - Properties p = new Properties();
> - p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl");
> -
> - repl = new ScaldingInterpreter(p);
> - repl.open();
> - }
> -
> - context = InterpreterContext.builder()
> - .setNoteId("noteId")
> - .setParagraphId("paragraphId")
> - .setAuthenticationInfo(new AuthenticationInfo())
> - .build();
> - }
> -
> - @After
> - public void tearDown() throws Exception {
> - delete(tmpDir);
> - repl.close();
> - }
> -
> - private void delete(File file) {
> - if (file.isFile()) {
> - file.delete();
> - } else if (file.isDirectory()) {
> - File[] files = file.listFiles();
> - if (files != null && files.length > 0) {
> - for (File f : files) {
> - delete(f);
> - }
> - }
> - file.delete();
> - }
> - }
> -
> - @Test
> - public void testNextLineComments() {
> - assertEquals(InterpreterResult.Code.SUCCESS,
> - repl.interpret("\"123\"\n/*comment here\n*/.toInt",
> context).code());
> - }
> -
> - @Test
> - public void testNextLineCompanionObject() {
> - String code = "class Counter {\nvar value: Long = 0\n}\n //
> comment\n\n object Counter " +
> - "{\n def apply(x: Long) = new Counter()\n}";
> - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code,
> context).code());
> - }
> -
> - @Test
> - public void testBasicIntp() {
> - assertEquals(InterpreterResult.Code.SUCCESS,
> - repl.interpret("val a = 1\nval b = 2", context).code());
> -
> - // when interpret incomplete expression
> - InterpreterResult incomplete = repl.interpret("val a = \"\"\"",
> context);
> - assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
> - assertTrue(incomplete.message().get(0).getData().length() > 0); //
> expecting some error
> - // message
> - }
> -
> - @Test
> - public void testBasicScalding() {
> - assertEquals(InterpreterResult.Code.SUCCESS,
> - repl.interpret("case class Sale(state: String, name: String,
> sale: Int)\n" +
> - "val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\",
> \"A\", 20), " +
> - "Sale(\"VA\", \"B\", 15))\n" +
> - "val salesPipe = TypedPipe.from(salesList)\n" +
> - "val results = salesPipe.map{x => (1, Set(x.state),
> x.sale)}.\n" +
> - " groupAll.sum.values.map{ case(count, set, sum) => (count,
> set.size, sum) }\n" +
> - "results.dump",
> - context).code());
> - }
> -
> - @Test
> - public void testNextLineInvocation() {
> - assertEquals(InterpreterResult.Code.SUCCESS,
> repl.interpret("\"123\"\n.toInt", context).code());
> - }
> -
> - @Test
> - public void testEndWithComment() {
> - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val
> c=1\n//comment",
> - context).code());
> - }
> -
> - @Test
> - public void testReferencingUndefinedVal() {
> - InterpreterResult result = repl.interpret("def category(min: Int) = {"
> - + " if (0 <= value) \"error\"" + "}", context);
> - assertEquals(Code.ERROR, result.code());
> - }
> -}
>
>