You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2019/11/26 16:06:25 UTC
[submarine] branch master updated: SUBMARINE-277. Support Spark
Interpreter add sparkSQL interpreter
This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 6cd62e3 SUBMARINE-277. Support Spark Interpreter add sparkSQL interpreter
6cd62e3 is described below
commit 6cd62e333432e2352e337db819e15e60f84145df
Author: xunix huang <qi...@qq.com>
AuthorDate: Tue Nov 26 22:39:48 2019 +0800
SUBMARINE-277. Support Spark Interpreter add sparkSQL interpreter
### What is this PR for?
Add SparkSQL interpreter and improve the structure of interpreter
### What type of PR is it?
Feature and Improvement
### Todos
* [*] - add sparkSQL interpreter
* [*] - add Abstract interpreter
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-277
### How should this be tested?
https://travis-ci.org/hhhizzz/submarine/builds/617065629
### Questions:
* Does the licenses files need an update? No
* Are there breaking changes for older versions? Yes, some change in InterpreterProcess
* Does this needs documentation? No
Author: xunix huang <qi...@qq.com>
Author: luzhonghao <lu...@163.com>
Closes #107 from hhhizzz/luzhonghao and squashes the following commits:
8dd305d [xunix huang] change exception
4fc0b34 [xunix huang] change exception
dfb2e3d [xunix huang] add license
e31f788 [xunix huang] shade the zeppelin package
409cdad [xunix huang] some fix
8ec5df2 [xunix huang] SUBMARINE-277. Support Spark Interpreter add sparkSQL interpreter
1654fbc [luzhonghao] sparkSqlInterpreter
---
.gitignore | 6 +
submarine-commons/commons-cluster/pom.xml | 2 +-
submarine-dist/src/assembly/distribution.xml | 7 -
.../interpreter/interpreter-engine/pom.xml | 1 -
.../submarine/interpreter/AbstractInterpreter.java | 141 +++++++++++++
.../apache/submarine/interpreter/Interpreter.java | 19 +-
.../interpreter/InterpreterException.java | 42 ++++
.../submarine/interpreter/InterpreterGroup.java | 23 ++
.../submarine/interpreter/InterpreterProcess.java | 213 ++++++-------------
.../interpreter/python-interpreter/pom.xml | 2 +-
.../submarine/interpreter/PythonInterpreter.java | 107 +++-------
.../interpreter/InterpreterClusterTest.java | 3 +-
.../interpreter/PythonInterpreterTest.java | 37 ++--
.../interpreter/spark-interpreter/README.md | 30 ++-
.../interpreter/spark-interpreter/pom.xml | 56 ++---
.../submarine/interpreter/SparkInterpreter.java | 166 ++++-----------
.../submarine/interpreter/SparkSqlInterpreter.java | 89 ++++++++
.../interpreter/SparkInterpreterTest.java | 233 ++++++++-------------
.../interpreter/SparkSqlInterpreterTest.java | 204 ++++++++++++++++++
.../src/test/resources/hive-site.xml | 7 +
.../src/test/resources/log4j.properties | 35 ++++
21 files changed, 856 insertions(+), 567 deletions(-)
diff --git a/.gitignore b/.gitignore
index 521d78c..f614fea 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,3 +73,9 @@ submarine-workbench/workbench-web-ng/dist
submarine-workbench/workbench-web-ng/package-lock.json
submarine-workbench/workbench-web-ng/npm-debug.log*
submarine-test/e2e/Driver
+
+# interpreter temp files
+derby.log
+submarine-workbench/interpreter/spark-interpreter/metastore_db/
+spark-1.*-bin-hadoop*
+.spark-dist
diff --git a/submarine-commons/commons-cluster/pom.xml b/submarine-commons/commons-cluster/pom.xml
index d0eb4c0..f1dbb22 100644
--- a/submarine-commons/commons-cluster/pom.xml
+++ b/submarine-commons/commons-cluster/pom.xml
@@ -158,7 +158,7 @@
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.submarine.interpreter.InterpreterProcess</mainClass>
+ <mainClass>org.apache.submarine.interpreter.AbstractInterpreter</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
diff --git a/submarine-dist/src/assembly/distribution.xml b/submarine-dist/src/assembly/distribution.xml
index ebd9794..c8cc582 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -149,13 +149,6 @@
</includes>
</fileSet>
<fileSet>
- <directory>../submarine-workbench/interpreter/spark/scala-2.11</directory>
- <outputDirectory>/lib/interpreter/spark/scala-2.11</outputDirectory>
- <includes>
- <include>spark-scala-2.11-${zeppelin.version}.jar</include>
- </includes>
- </fileSet>
- <fileSet>
<directory>../submarine-workbench/interpreter/spark-interpreter/target</directory>
<outputDirectory>/lib/interpreter/spark</outputDirectory>
<includes>
diff --git a/submarine-workbench/interpreter/interpreter-engine/pom.xml b/submarine-workbench/interpreter/interpreter-engine/pom.xml
index 8d8e838..01d9129 100644
--- a/submarine-workbench/interpreter/interpreter-engine/pom.xml
+++ b/submarine-workbench/interpreter/interpreter-engine/pom.xml
@@ -105,7 +105,6 @@
<scope>test</scope>
</dependency>
</dependencies>
-
<build>
<finalName>submarine-${artifactId}-${project.version}</finalName>
<plugins>
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/AbstractInterpreter.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/AbstractInterpreter.java
new file mode 100644
index 0000000..d319838
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/AbstractInterpreter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Entry point for Submarine Interpreter process.
+ * Accepting thrift connections from Submarine Workbench Server.
+ */
+public abstract class AbstractInterpreter implements Interpreter {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractInterpreter.class);
+
+ protected org.apache.zeppelin.interpreter.Interpreter zeppelinInterpreter;
+
+ private InterpreterContext interpreterContext;
+
+ protected InterpreterContext getIntpContext() {
+ if (this.interpreterContext == null) {
+ this.interpreterContext = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .build();
+ InterpreterContext.set(this.interpreterContext);
+ }
+ return this.interpreterContext;
+ }
+
+ public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
+ this.zeppelinInterpreter.setInterpreterGroup(interpreterGroup);
+ }
+
+ public InterpreterGroup getInterpreterGroup() {
+ return this.zeppelinInterpreter.getInterpreterGroup();
+ }
+
+ protected Properties mergeZeppelinInterpreterProperties(Properties properties) {
+ Properties newProps = new Properties();
+
+ for (String key : properties.stringPropertyNames()) {
+ String newKey = key.replace("submarine", "zeppelin");
+ newProps.put(newKey, properties.getProperty(key));
+ }
+
+ return newProps;
+ }
+
+ @Override
+ public InterpreterResult interpret(String code) throws InterpreterException {
+ InterpreterResult interpreterResult = null;
+ try {
+ org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
+ = this.zeppelinInterpreter.interpret(code, getIntpContext());
+ interpreterResult = new InterpreterResult(zeplInterpreterResult);
+
+ List<InterpreterResultMessage> interpreterResultMessages =
+ getIntpContext().out.toInterpreterResultMessage();
+
+ for (org.apache.zeppelin.interpreter.InterpreterResultMessage message : interpreterResultMessages) {
+ interpreterResult.add(message);
+ }
+ } catch (org.apache.zeppelin.interpreter.InterpreterException | IOException e) {
+ LOG.error(e.getMessage(), e);
+ throw new InterpreterException(e);
+ }
+
+ return interpreterResult;
+ }
+
+ @Override
+ public void open() throws InterpreterException {
+ getIntpContext();
+ try {
+ this.zeppelinInterpreter.open();
+ } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ throw new org.apache.submarine.interpreter.InterpreterException(e);
+ }
+ }
+
+ @Override
+ public void close() throws InterpreterException {
+ try {
+ this.zeppelinInterpreter.close();
+ } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ throw new org.apache.submarine.interpreter.InterpreterException(e);
+ }
+ }
+
+ @Override
+ public void cancel() throws InterpreterException {
+ try {
+ this.zeppelinInterpreter.cancel(getIntpContext());
+ } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ throw new org.apache.submarine.interpreter.InterpreterException(e);
+ }
+ }
+
+ @Override
+ public int getProgress() throws InterpreterException {
+ int process = 0;
+ try {
+ process = this.zeppelinInterpreter.getProgress(getIntpContext());
+ } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ throw new org.apache.submarine.interpreter.InterpreterException(e);
+ }
+ return process;
+ }
+
+ @Override
+ public void addToSession(String session) {
+ this.zeppelinInterpreter.getInterpreterGroup().get(session).add(this.zeppelinInterpreter);
+ }
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
index 9681ef4..ae7cdfb 100644
--- a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
@@ -18,12 +18,17 @@
package org.apache.submarine.interpreter;
public interface Interpreter {
- void shutdown();
- boolean isRunning();
- void open();
- InterpreterResult interpret(String code);
- void close();
- void cancel();
- int getProgress();
+ void open() throws InterpreterException;
+
+ InterpreterResult interpret(String code) throws InterpreterException;
+
+ void close() throws InterpreterException;
+
+ void cancel() throws InterpreterException;
+
+ int getProgress() throws InterpreterException;
+
boolean test();
+
+ void addToSession(String session);
}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterException.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterException.java
new file mode 100644
index 0000000..636d1fe
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+public class InterpreterException extends org.apache.zeppelin.interpreter.InterpreterException {
+
+ public InterpreterException() {
+ }
+
+ public InterpreterException(Throwable e) {
+ super(e);
+ }
+
+ public InterpreterException(String m) {
+ super(m);
+ }
+
+ public InterpreterException(String msg, Throwable t) {
+ super(msg, t);
+ }
+
+ public InterpreterException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterGroup.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterGroup.java
new file mode 100644
index 0000000..bfb0d5f
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterGroup.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+public class InterpreterGroup extends org.apache.zeppelin.interpreter.InterpreterGroup {
+
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
index 0e40020..1e9a50c 100644
--- a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
@@ -22,101 +22,55 @@ import org.apache.commons.lang.StringUtils;
import org.apache.submarine.commons.cluster.ClusterClient;
import org.apache.submarine.commons.cluster.meta.ClusterMeta;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;
-import sun.misc.SignalHandler;
-import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketException;
-import java.net.URL;
import java.net.URLClassLoader;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
-/**
- * Entry point for Submarine Interpreter process.
- * Accepting thrift connections from Submarine Workbench Server.
- */
-public class InterpreterProcess extends Thread implements Interpreter {
- private static final Logger LOG = LoggerFactory.getLogger(InterpreterProcess.class);
+public class InterpreterProcess extends Thread {
- // cluster manager client
- private ClusterClient clusterClient = ClusterClient.getInstance();
+ private static Logger LOG = LoggerFactory.getLogger(InterpreterProcess.class);
- private SubmarineConfiguration sconf = SubmarineConfiguration.getInstance();
+ private Interpreter interpreter;
- protected String interpreterId;
-
- private InterpreterProcess interpreterProcess;
+ private static InterpreterProcess process;
private AtomicBoolean isRunning = new AtomicBoolean(false);
- public static void main(String[] args) throws InterruptedException, IOException {
- String interpreterType = args[0];
- String interpreterId = args[1];
- Boolean onlyTest = false;
- if (args.length == 3 && StringUtils.equals(args[2], "test")) {
- onlyTest = true;
- }
-
- InterpreterProcess interpreterProcess = new InterpreterProcess(interpreterType, interpreterId, onlyTest);
- interpreterProcess.start();
-
- // add signal handler
- Signal.handle(new Signal("TERM"), new SignalHandler() {
- @Override
- public void handle(Signal signal) {
- // clean
- LOG.info("handle signal:{}", signal);
- }
- });
-
- interpreterProcess.join();
- System.exit(0);
- }
+ // cluster manager client
+ private ClusterClient clusterClient = ClusterClient.getInstance();
- @Override
- public void run() {
- isRunning.set(true);
- while (isRunning.get()) {
- try {
- // TODO(Xun Liu): Next PR will add Thrift Server in here
- LOG.info("Mock TServer run ...");
- sleep(1000);
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
+ private SubmarineConfiguration sconf = SubmarineConfiguration.getInstance();
- public boolean isRunning() {
- return isRunning.get();
- }
+ protected String interpreterId;
public InterpreterProcess() { }
public InterpreterProcess(String interpreterType, String interpreterId, Boolean onlyTest)
- throws IOException {
+ throws IOException {
this.interpreterId = interpreterId;
- this.interpreterProcess = loadInterpreterPlugin(interpreterType);
+ try {
+ loadInterpreterPlugin(interpreterType);
+ } catch (InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ System.exit(e.hashCode());
+ }
- if (true == onlyTest) {
- boolean testResult = interpreterProcess.test();
+ if (onlyTest) {
+ boolean testResult = interpreter.test();
LOG.info("Interpreter test result: {}", testResult);
System.exit(0);
return;
@@ -151,38 +105,38 @@ public class InterpreterProcess extends Thread implements Interpreter {
}
// get super interpreter class name
- private String getSuperInterpreterClassName(String intpName) {
+ private String getSuperInterpreterClassName(String intpName) throws InterpreterException {
String superIntpClassName = "";
if (StringUtils.equals(intpName, "python")) {
superIntpClassName = "org.apache.submarine.interpreter.PythonInterpreter";
} else if (StringUtils.equals(intpName, "spark")) {
superIntpClassName = "org.apache.submarine.interpreter.SparkInterpreter";
+ } else if (StringUtils.equals(intpName, "sparksql")) {
+ superIntpClassName = "org.apache.submarine.interpreter.SparkSqlInterpreter";
} else {
- superIntpClassName = "org.apache.submarine.interpreter.InterpreterProcess";
+ throw new InterpreterException("cannot recognize the interpreter: " + intpName);
}
return superIntpClassName;
}
- public synchronized InterpreterProcess loadInterpreterPlugin(String pluginName)
- throws IOException {
+ public synchronized void loadInterpreterPlugin(String pluginName)
+ throws IOException, InterpreterException {
LOG.info("Loading Plug name: {}", pluginName);
String pluginClassName = getSuperInterpreterClassName(pluginName);
LOG.info("Loading Plug Class name: {}", pluginClassName);
// load plugin from classpath directly first for these builtin Interpreter Plugin.
- InterpreterProcess intpProcess = loadPluginFromClassPath(pluginClassName, null);
- if (intpProcess == null) {
+ this.interpreter = loadPluginFromClassPath(pluginClassName, null);
+ if (this.interpreter == null) {
throw new IOException("Fail to load plugin: " + pluginName);
}
-
- return intpProcess;
}
- private InterpreterProcess loadPluginFromClassPath(
- String pluginClassName, URLClassLoader pluginClassLoader) {
- InterpreterProcess intpProcess = null;
+ private Interpreter loadPluginFromClassPath(
+ String pluginClassName, URLClassLoader pluginClassLoader) {
+ Interpreter intpProcess = null;
try {
Class<?> clazz = null;
if (null == pluginClassLoader) {
@@ -202,107 +156,58 @@ public class InterpreterProcess extends Thread implements Interpreter {
LOG.debug(method.getName());
}
- intpProcess = (InterpreterProcess) (clazz.getConstructor().newInstance());
+ intpProcess = (Interpreter) (clazz.getConstructor().newInstance());
return intpProcess;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException
- | NoSuchMethodException | InvocationTargetException e) {
+ | NoSuchMethodException | InvocationTargetException e) {
LOG.warn("Fail to instantiate InterpreterLauncher from classpath directly:", e);
}
return intpProcess;
}
- // Get the class load from the specified path
- private URLClassLoader getPluginClassLoader(String pluginsDir, String pluginName)
- throws IOException {
- File pluginFolder = new File(pluginsDir + "/" + pluginName);
- if (!pluginFolder.exists() || pluginFolder.isFile()) {
- LOG.warn("PluginFolder {} doesn't exist or is not a directory", pluginFolder.getAbsolutePath());
- return null;
- }
- List<URL> urls = new ArrayList<>();
- for (File file : pluginFolder.listFiles()) {
- LOG.debug("Add file {} to classpath of plugin: {}", file.getAbsolutePath(), pluginName);
- urls.add(file.toURI().toURL());
- }
- if (urls.isEmpty()) {
- LOG.warn("Can not load plugin {}, because the plugin folder {} is empty.", pluginName, pluginFolder);
- return null;
- }
- return new URLClassLoader(urls.toArray(new URL[0]));
- }
-
- protected Properties mergeZeplPyIntpProp(Properties newProps) {
- Properties properties = new Properties();
- // Max number of dataframe rows to display.
- properties.setProperty("zeppelin.python.maxResult", "1000");
- // whether use IPython when it is available
- properties.setProperty("zeppelin.python.useIPython", "false");
- properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
-
- if (null != newProps) {
- newProps.putAll(properties);
- return newProps;
- } else {
- return properties;
- }
- }
-
- protected Properties mergeZeplSparkIntpProp(Properties newProps) {
- Properties properties = new Properties();
-
- properties.setProperty("zeppelin.spark.maxResult", "1000");
- properties.setProperty("zeppelin.spark.scala.color", "false");
-
- if (null != newProps) {
- newProps.putAll(properties);
- return newProps;
- } else {
- return properties;
- }
+ public boolean isRunning() {
+ return isRunning.get();
}
- protected static InterpreterContext getIntpContext() {
- return InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput(null))
- .build();
- }
- @Override
public void shutdown() {
isRunning.set(false);
}
- @Override
- public void open() {
- LOG.error("Please implement the open() method of the child class!");
- }
- @Override
- public InterpreterResult interpret(String code) {
- LOG.error("Please implement the interpret() method of the child class!");
- return null;
- }
+ public static void main(String[] args) throws InterruptedException, IOException {
+ String interpreterType = args[0];
+ String interpreterId = args[1];
+ boolean onlyTest = false;
+ if (args.length == 3 && StringUtils.equals(args[2], "test")) {
+ onlyTest = true;
+ }
- @Override
- public void close() {
- LOG.error("Please implement the close() method of the child class!");
- }
+ InterpreterProcess interpreterProcess = new InterpreterProcess(interpreterType, interpreterId, onlyTest);
+ interpreterProcess.start();
- @Override
- public void cancel() {
- LOG.error("Please implement the cancel() method of the child class!");
- }
+ // add signal handler
+ Signal.handle(new Signal("TERM"), signal -> {
+ // clean
+ LOG.info("handle signal:{}", signal);
+ });
- @Override
- public int getProgress() {
- LOG.error("Please implement the getProgress() method of the child class!");
- return 0;
+ interpreterProcess.join();
+ System.exit(0);
}
@Override
- public boolean test() {
- LOG.error("Please implement the test() method of the child class!");
- return false;
+ public void run() {
+ isRunning.set(true);
+ while (isRunning.get()) {
+ try {
+ // TODO(Xun Liu): Next PR will add Thrift Server in here
+ LOG.info("Mock TServer run ...");
+ sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
}
}
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml b/submarine-workbench/interpreter/python-interpreter/pom.xml
index 7a18634..d589a94 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -228,7 +228,7 @@
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.submarine.interpreter.InterpreterProcess</mainClass>
+ <mainClass>org.apache.submarine.interpreter.AbstractInterpreter</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
index e413684..699a85e 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
@@ -19,104 +19,61 @@
package org.apache.submarine.interpreter;
import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
import java.util.Properties;
-public class PythonInterpreter extends InterpreterProcess {
+public class PythonInterpreter extends AbstractInterpreter {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
- private org.apache.zeppelin.python.PythonInterpreter zplePythonInterpreter;
- private InterpreterContext intpContext;
-
public PythonInterpreter() {
- Properties properties = new Properties();
- properties = mergeZeplPyIntpProp(properties);
- zplePythonInterpreter = new org.apache.zeppelin.python.PythonInterpreter(properties);
- zplePythonInterpreter.setInterpreterGroup(new InterpreterGroup());
- intpContext = this.getIntpContext();
- }
-
- @Override
- public void open() {
- try {
- zplePythonInterpreter.open();
- } catch (InterpreterException e) {
- LOG.error(e.getMessage(), e);
- }
+ this(new Properties());
}
- @Override
- public InterpreterResult interpret(String code) {
- InterpreterResult interpreterResult = null;
- try {
- org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
- = zplePythonInterpreter.interpret(code, intpContext);
- interpreterResult = new InterpreterResult(zeplInterpreterResult);
-
- List<org.apache.zeppelin.interpreter.InterpreterResultMessage> interpreterResultMessages =
- intpContext.out.toInterpreterResultMessage();
-
- for (org.apache.zeppelin.interpreter.InterpreterResultMessage message : interpreterResultMessages) {
- interpreterResult.add(message);
- }
- } catch (InterpreterException | IOException e) {
- LOG.error(e.getMessage(), e);
- }
-
- return interpreterResult;
- }
-
- @Override
- public void close() {
- try {
- zplePythonInterpreter.close();
- } catch (InterpreterException e) {
- LOG.error(e.getMessage(), e);
- }
+ public PythonInterpreter(Properties properties){
+ properties = mergeZeppelinInterpreterProperties(properties);
+ this.zeppelinInterpreter = new org.apache.zeppelin.python.PythonInterpreter(properties);
+ this.zeppelinInterpreter.setInterpreterGroup(new InterpreterGroup());
}
@Override
- public void cancel() {
- try {
- zplePythonInterpreter.cancel(intpContext);
- } catch (InterpreterException e) {
- LOG.error(e.getMessage(), e);
+ protected Properties mergeZeppelinInterpreterProperties(Properties properties) {
+ properties = super.mergeZeppelinInterpreterProperties(properties);
+ Properties newProps = new Properties();
+ // Max number of dataframe rows to display.
+ newProps.setProperty("zeppelin.python.maxResult", "1000");
+ // whether use IPython when it is available
+ newProps.setProperty("zeppelin.python.useIPython", "false");
+ newProps.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
+
+ if (null != properties) {
+ newProps.putAll(properties);
}
+ return newProps;
}
@Override
- public int getProgress() {
- int process = 0;
+ public boolean test() {
try {
- process = zplePythonInterpreter.getProgress(intpContext);
+ open();
+ String code = "1 + 1";
+ InterpreterResult result = interpret(code);
+ LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}",
+ code, result.message().get(0).getData()
+ );
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ return false;
+ }
+ if (StringUtils.equals(result.message().get(0).getData(), "2\n")) {
+ return true;
+ }
+ return false;
} catch (InterpreterException e) {
LOG.error(e.getMessage(), e);
- }
-
- return process;
- }
-
- @Override
- public boolean test() {
- open();
- String code = "1 + 1";
- InterpreterResult result = interpret(code);
- LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}",
- code, result.message().get(0).getData());
-
- if (result.code() != InterpreterResult.Code.SUCCESS) {
return false;
}
- if (StringUtils.equals(result.message().get(0).getData(), "2\n")) {
- return true;
- }
- return false;
}
}
diff --git a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
index 2a2da4a..8fc91f7 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_META;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class InterpreterClusterTest {
private static Logger LOG = LoggerFactory.getLogger(InterpreterClusterTest.class);
@@ -154,6 +155,6 @@ public class InterpreterClusterTest {
}
Thread.sleep(200);
}
- assertEquals(false, interpreterProcess.isRunning());
+ assertFalse(interpreterProcess.isRunning());
}
}
diff --git a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java
index d897da0..8096ba7 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class PythonInterpreterTest {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterTest.class);
@@ -40,7 +41,7 @@ public class PythonInterpreterTest {
private static PythonInterpreter pythonInterpreterForClose = null;
@BeforeClass
- public static void setUp() {
+ public static void setUp() throws InterpreterException {
pythonInterpreterForCancel = new PythonInterpreter();
pythonInterpreterForClose = new PythonInterpreter();
pythonInterpreterForCancel.open();
@@ -48,7 +49,7 @@ public class PythonInterpreterTest {
}
@AfterClass
- public static void tearDown() {
+ public static void tearDown() throws InterpreterException {
if (null != pythonInterpreterForCancel) {
pythonInterpreterForCancel.close();
}
@@ -59,7 +60,7 @@ public class PythonInterpreterTest {
@Test
- public void calcOnePlusOne() {
+ public void calcOnePlusOne() throws InterpreterException {
String code = "1+1";
InterpreterResult result = pythonInterpreterForCancel.interpret(code);
LOG.info("result = {}", result);
@@ -69,30 +70,40 @@ public class PythonInterpreterTest {
assertEquals(result.message().get(0).getData(), "2\n");
}
- private class infinityPythonJobforCancel implements Runnable {
+ private static class infinityPythonJobforCancel implements Runnable {
@Override
public void run() {
String code = "import time\nwhile True:\n time.sleep(1)";
InterpreterResult ret = null;
- ret = pythonInterpreterForCancel.interpret(code);
- assertNotNull(ret);
- Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
- Matcher m = expectedMessage.matcher(ret.message().toString());
- assertTrue(m.find());
+ try {
+ ret = pythonInterpreterForCancel.interpret(code);
+ assertNotNull(ret);
+ Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
+ Matcher m = expectedMessage.matcher(ret.message().toString());
+ assertTrue(m.find());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ fail();
+ }
}
}
- private class infinityPythonJobforClose implements Runnable {
+ private static class infinityPythonJobforClose implements Runnable {
@Override
public void run() {
String code = "import time\nwhile True:\n time.sleep(1)";
- pythonInterpreterForClose.interpret(code);
+ try {
+ pythonInterpreterForClose.interpret(code);
+ } catch (org.apache.submarine.interpreter.InterpreterException e) {
+ e.printStackTrace();
+ fail();
+ }
}
}
@Test
- public void testCloseIntp() throws InterruptedException {
+ public void testCloseIntp() throws InterruptedException, InterpreterException {
assertEquals(InterpreterResult.Code.SUCCESS,
pythonInterpreterForClose.interpret("1+1\n").code());
Thread t = new Thread(new infinityPythonJobforClose());
@@ -106,7 +117,7 @@ public class PythonInterpreterTest {
@Test
- public void testCancelIntp() throws InterruptedException {
+ public void testCancelIntp() throws InterruptedException, InterpreterException {
assertEquals(InterpreterResult.Code.SUCCESS,
pythonInterpreterForCancel.interpret("1+1\n").code());
Thread t = new Thread(new infinityPythonJobforCancel());
diff --git a/submarine-workbench/interpreter/spark-interpreter/README.md b/submarine-workbench/interpreter/spark-interpreter/README.md
index 9b210ce..3135511 100644
--- a/submarine-workbench/interpreter/spark-interpreter/README.md
+++ b/submarine-workbench/interpreter/spark-interpreter/README.md
@@ -17,13 +17,16 @@
## Test Submarine Spark Interpreter
### Execute test command
-```
-export SUBMARINE_HOME=/path/to/your/submarine_home
-java -jar spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id test
+```bash
+# spark interpreter
+java -jar submarine-spark-interpreter-{{version}}-shade.jar spark spark-interpreter-id test
+# sparkSQL interpreter
+java -jar submarine-spark-interpreter-{{version}}-shade.jar sparksql sparkSQL-interpreter-id test
```
### Print test result
-```
+#### Spark
+```log
INFO [2019-11-09 11:12:04,888] ({main} ContextHandler.java[doStart]:781) - Started o.s.j.s.ServletContextHandler@58b97c15{/stages/stage/kill,null,AVAILABLE,@Spark}
INFO [2019-11-09 11:12:04,889] ({main} Logging.scala[logInfo]:54) - Bound SparkUI to 0.0.0.0, and started at http://10.0.0.3:4040
INFO [2019-11-09 11:12:04,923] ({main} Logging.scala[logInfo]:54) - Starting executor ID driver on host localhost
@@ -63,13 +66,30 @@ java -jar spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id
INFO [2019-11-09 11:12:08,553] ({shutdown-hook-0} Logging.scala[logInfo]:54) - Deleting directory /private/var/folders/xl/_xb3fgzj5zd698khfz6z74cc0000gn/T/spark-2f4acad9-a72d-4bca-8d85-3ef310f0b08c
```
+#### SparkSQL
+```log
+ INFO [2019-11-25 19:13:05,993] ({task-result-getter-0} Logging.scala[logInfo]:54) - Finished task 0.0 in stage 0.0 (TID 0) in 79 ms on localhost (executor driver) (1/1)
+ INFO [2019-11-25 19:13:05,994] ({task-result-getter-0} Logging.scala[logInfo]:54) - Removed TaskSet 0.0, whose tasks have all completed, from pool
+ INFO [2019-11-25 19:13:05,997] ({dag-scheduler-event-loop} Logging.scala[logInfo]:54) - ResultStage 0 (takeAsList at Spark2Shims.java:65) finished in 0.093 s
+DEBUG [2019-11-25 19:13:05,999] ({dag-scheduler-event-loop} Logging.scala[logDebug]:58) - After removal of stage 0, remaining stages = 0
+ INFO [2019-11-25 19:13:06,000] ({main} Logging.scala[logInfo]:54) - Job 0 finished: takeAsList at Spark2Shims.java:65, took 0.744646 s
+ INFO [2019-11-25 19:13:06,006] ({main} SparkSqlInterpreter.java[test]:71) - Execution SparkSQL Interpreter, Calculation Spark Code SUCCESS, Result =
+group person
+g1 [moon,33]
+g2 [sun,11]
+
+ INFO [2019-11-25 19:13:06,006] ({main} InterpreterProcess.java[<init>]:51) - Interpreter test result: true
+ INFO [2019-11-25 19:13:06,009] ({shutdown-hook-0} Logging.scala[logInfo]:54) - Invoking stop() from shutdown hook
+
+```
+
## Debug Submarine Spark Interpreter
### Execute debug command
```
-java -jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id
+java -jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 spark-interpreter-{{version}}-shade.jar spark spark-interpreter-id
```
Connect via remote debugging in IDEA
diff --git a/submarine-workbench/interpreter/spark-interpreter/pom.xml b/submarine-workbench/interpreter/spark-interpreter/pom.xml
index d9a9b05..4388173 100644
--- a/submarine-workbench/interpreter/spark-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/spark-interpreter/pom.xml
@@ -142,6 +142,21 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>spark-scala-2.10</artifactId>
+ <version>${zeppelin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>spark-scala-2.12</artifactId>
+ <version>${zeppelin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>spark-scala-2.11</artifactId>
+ <version>${zeppelin.version}</version>
+ </dependency>
<!-- Test libraries -->
<dependency>
@@ -162,17 +177,18 @@
<version>${zeppelin.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.zeppelin</groupId>
- <artifactId>spark-scala-2.11</artifactId>
- <version>0.9.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
-
</dependencies>
<build>
<finalName>submarine-${artifactId}-${project.version}</finalName>
+ <testResources>
+ <testResource>
+ <directory>${project.basedir}/src/test/resources</directory>
+ </testResource>
+ <testResource>
+ <directory>${project.basedir}/src/main/resources</directory>
+ </testResource>
+ </testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -233,32 +249,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
-
- <executions>
- <execution>
- <id>copy-dependencies-runtime</id>
- <phase>validate</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <includeArtifactIds>spark-scala-2.11</includeArtifactIds>
- <includeGroupIds>org.apache.zeppelin</includeGroupIds>
- <outputDirectory>../spark/scala-2.11</outputDirectory>
- </configuration>
- </execution>
- <execution>
- <id>copy-dependencies-system</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <includeScope>system</includeScope>
- <excludeTransitive>true</excludeTransitive>
- </configuration>
- </execution>
- </executions>
</plugin>
</plugins>
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
index 40ee160..132ef60 100644
--- a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
+++ b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
@@ -18,158 +18,76 @@
*/
package org.apache.submarine.interpreter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.spark.SparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
-public class SparkInterpreter extends InterpreterProcess {
+public class SparkInterpreter extends AbstractInterpreter {
private static final Logger LOG = LoggerFactory.getLogger(SparkInterpreter.class);
- private org.apache.zeppelin.spark.SparkInterpreter zpleSparkInterpreter;
- private InterpreterContext intpContext;
-
- private String extractScalaVersion() throws InterpreterException {
- String scalaVersionString = scala.util.Properties.versionString();
- LOG.info("Using Scala: " + scalaVersionString);
- if (scalaVersionString.contains("version 2.10")) {
- return "2.10";
- } else if (scalaVersionString.contains("version 2.11")) {
- return "2.11";
- } else if (scalaVersionString.contains("version 2.12")) {
- return "2.12";
- } else {
- throw new InterpreterException("Unsupported scala version: " + scalaVersionString);
- }
- }
-
public SparkInterpreter(Properties properties) {
- properties = mergeZeplSparkIntpProp(properties);
- zpleSparkInterpreter = new org.apache.zeppelin.spark.SparkInterpreter(properties);
- zpleSparkInterpreter.setInterpreterGroup(new InterpreterGroup());
- intpContext = this.getIntpContext();
+ properties = mergeZeppelinInterpreterProperties(properties);
+ this.zeppelinInterpreter = new org.apache.zeppelin.spark.SparkInterpreter(properties);
+ this.setInterpreterGroup(new InterpreterGroup());
}
+
public SparkInterpreter() {
this(new Properties());
}
@Override
- public void open() {
+ public boolean test() {
try {
- ClassLoader scalaInterpreterClassLoader = null;
- String submarineHome = System.getenv("SUBMARINE_HOME");
- String interpreterDir = "";
- if (StringUtils.isBlank(submarineHome)) {
- LOG.warn("SUBMARINE_HOME is not set, default interpreter directory is ../ ");
- interpreterDir = "..";
- } else {
- interpreterDir = submarineHome + "/workbench/interpreter";
+ open();
+ String code = "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+ "df.show()";
+ InterpreterResult result = interpret(code);
+ LOG.info("Execution Spark Interpreter, Calculation Spark Code {}, Result = {}",
+ code, result.message().get(0).getData()
+ );
+
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ close();
+ return false;
}
- String scalaVersion = extractScalaVersion();
- File scalaJarFolder = new File(interpreterDir + "/spark/scala-" + scalaVersion);
- List<URL> urls = new ArrayList<>();
- for (File file : scalaJarFolder.listFiles()) {
- LOG.info("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
- + scalaJarFolder);
- urls.add(file.toURI().toURL());
- }
- scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
- Thread.currentThread().getContextClassLoader());
- if (scalaInterpreterClassLoader != null) {
- Thread.currentThread().setContextClassLoader(scalaInterpreterClassLoader);
- }
- zpleSparkInterpreter.open();
+ boolean success = (result.message().get(0).getData().contains(
+ "+---+----+\n" +
+ "| _1| _2|\n" +
+ "+---+----+\n" +
+ "| 1| a|\n" +
+ "| 2|null|\n" +
+ "+---+----+")
+ );
+ close();
+ return success;
} catch (InterpreterException e) {
LOG.error(e.getMessage(), e);
- } catch (MalformedURLException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- @Override
- public InterpreterResult interpret(String code) {
- InterpreterResult interpreterResult = null;
- try {
- org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
- = zpleSparkInterpreter.interpret(code, intpContext);
- interpreterResult = new InterpreterResult(zeplInterpreterResult);
-
- List<InterpreterResultMessage> interpreterResultMessages =
- intpContext.out.toInterpreterResultMessage();
-
- for (org.apache.zeppelin.interpreter.InterpreterResultMessage message : interpreterResultMessages) {
- interpreterResult.add(message);
- }
- } catch (InterpreterException | IOException e) {
- LOG.error(e.getMessage(), e);
+ return false;
}
-
- return interpreterResult;
}
- @Override
- public void close() {
- try {
- zpleSparkInterpreter.close();
- } catch (InterpreterException e) {
- LOG.error(e.getMessage(), e);
- }
+ public SparkContext getSparkContext() {
+ return ((org.apache.zeppelin.spark.SparkInterpreter) this.zeppelinInterpreter).getSparkContext();
}
- @Override
- public void cancel() {
- try {
- zpleSparkInterpreter.cancel(intpContext);
- } catch (InterpreterException e) {
- LOG.error(e.getMessage(), e);
- }
+ public void setSchedulerPool(String pool){
+ this.getIntpContext().getLocalProperties().put("pool", pool);
}
@Override
- public int getProgress() {
- int process = 0;
- try {
- process = zpleSparkInterpreter.getProgress(intpContext);
- } catch (InterpreterException e) {
- LOG.error(e.getMessage(), e);
- }
-
- return process;
- }
+ protected Properties mergeZeppelinInterpreterProperties(Properties properties) {
+ properties = super.mergeZeppelinInterpreterProperties(properties);
+ Properties defaultProperties = new Properties();
- @Override
- public boolean test() {
- open();
- String code = "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
- "df.show()";
- InterpreterResult result = interpret(code);
- LOG.info("Execution Spark Interpreter, Calculation Spark Code {}, Result = {}",
- code, result.message().get(0).getData());
+ defaultProperties.setProperty("zeppelin.spark.maxResult", "1000");
+ defaultProperties.setProperty("zeppelin.spark.scala.color", "false");
- if (result.code() != InterpreterResult.Code.SUCCESS) {
- close();
- return false;
+ if (null != properties) {
+ defaultProperties.putAll(properties);
}
- boolean success = (result.message().get(0).getData().contains(
- "+---+----+\n" +
- "| _1| _2|\n" +
- "+---+----+\n" +
- "| 1| a|\n" +
- "| 2|null|\n" +
- "+---+----+"));
- close();
- return success;
+ return defaultProperties;
}
+
}
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkSqlInterpreter.java b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkSqlInterpreter.java
new file mode 100644
index 0000000..a9732cb
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkSqlInterpreter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.Properties;
+
+public class SparkSqlInterpreter extends AbstractInterpreter {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkInterpreter.class);
+
+ public SparkSqlInterpreter(Properties properties) {
+ properties = mergeZeppelinInterpreterProperties(properties);
+ this.zeppelinInterpreter = new org.apache.zeppelin.spark.SparkSqlInterpreter(properties);
+ }
+
+ public SparkSqlInterpreter() {
+ this(new Properties());
+ }
+
+
+ @Override
+ public boolean test() {
+ try {
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ SparkInterpreter sparkInterpreter = new SparkInterpreter();
+ sparkInterpreter.setInterpreterGroup(intpGroup);
+
+ this.setInterpreterGroup(intpGroup);
+
+ String session = "session_1";
+ intpGroup.put(session, new LinkedList<>());
+ sparkInterpreter.addToSession(session);
+ this.addToSession(session);
+
+ sparkInterpreter.open();
+ open();
+
+ sparkInterpreter.interpret("case class Person(name:String, age:Int)");
+ sparkInterpreter.interpret("case class People(group:String, person:Person)");
+ sparkInterpreter.interpret(
+ "val gr = sc.parallelize(Seq(" +
+ "People(\"g1\", " +
+ "Person(\"moon\",33)), " +
+ "People(\"g2\", " +
+ "Person(\"sun\",11" +
+ "))))");
+ sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")");
+
+ InterpreterResult result = interpret("select * from gr");
+ LOG.info("Execution SparkSQL Interpreter, Calculation Spark Code {}, Result =\n {}",
+ result.code(), result.message().get(0).getData()
+ );
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ return false;
+ }
+ if (!result.message().get(0).getData().contains("[moon,33]") ||
+ !result.message().get(0).getData().contains("[sun,11]")) {
+ return false;
+ }
+ return true;
+ } catch (InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ return false;
+ }
+ }
+
+ public void setResultLimits(long number){
+ getIntpContext().getLocalProperties().put("limit", String.valueOf(number));
+ }
+}
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
index 9672e21..3a2f8b6 100644
--- a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
@@ -17,65 +17,42 @@
package org.apache.submarine.interpreter;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.fail;
public class SparkInterpreterTest {
private SparkInterpreter interpreter;
// catch the streaming output in onAppend
private volatile String output = "";
- // catch the interpreter output in onUpdate
- private InterpreterResultMessageOutput messageOutput;
-
- private RemoteInterpreterEventClient mockRemoteEventClient;
@Before
public void setUp() {
- mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
+ // SparkInterpreter will change the current thread's classLoader
+ Thread.currentThread().setContextClassLoader(SparkInterpreterTest.class.getClassLoader());
}
@Test
- public void testSparkInterpreter() throws InterruptedException {
+ public void testSparkInterpreter() throws InterruptedException, InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
- properties.setProperty("zeppelin.spark.maxResult", "100");
- properties.setProperty("zeppelin.spark.test", "true");
- properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
+ properties.setProperty("submarine.spark.scala.color", "false");
+ properties.setProperty("submarine.spark.test", "true");
+ properties.setProperty("submarine.spark.uiWebUrl", "fake_spark_weburl");
// disable color output for easy testing
- properties.setProperty("zeppelin.spark.scala.color", "false");
- properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
-
- InterpreterContext context = InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mockRemoteEventClient)
- .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
- .build();
- InterpreterContext.set(context);
-
- interpreter = new SparkInterpreter();
- try {
- interpreter.open();
- } catch (Throwable ex) {
- ex.printStackTrace();
- }
+ properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
+ interpreter = new SparkInterpreter(properties);
+
+ interpreter.open();
InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -126,24 +103,23 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// Companion object with case class
- result = interpreter.interpret("import scala.math._\n" +
- "object Circle {\n" +
- " private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n" +
- "}\n" +
- "case class Circle(radius: Double) {\n" +
- " import Circle._\n" +
- " def area: Double = calculateArea(radius)\n" +
- "}\n" +
- "\n" +
- "val circle1 = new Circle(5.0)");
+ result = interpreter.interpret(
+ "import scala.math._\n" +
+ "object Circle {\n" +
+ "private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n" +
+ "}\n" +
+ "case class Circle(radius: Double) {\n" +
+ " import Circle._\n" +
+ " def area: Double = calculateArea(radius)\n" +
+ "}\n" +
+ "\n" +
+ "val circle1 = new Circle(5.0)");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// class extend
result = interpreter.interpret("import java.util.ArrayList");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- context = getInterpreterContext();
- context.setParagraphId("pid_1");
result = interpreter.interpret("sc\n.range(1, 10)\n.sum");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("45"));
@@ -154,15 +130,15 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
result = interpreter.interpret(
- "case class Bank(age:Integer, job:String, marital : String, edu : String, balance : Integer)\n" +
- "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
- " s => Bank(s(0).toInt, \n" +
- " s(1).replaceAll(\"\\\"\", \"\"),\n" +
- " s(2).replaceAll(\"\\\"\", \"\"),\n" +
- " s(3).replaceAll(\"\\\"\", \"\"),\n" +
- " s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
- " )\n" +
- ").toDF()");
+ "case class Bank(age:Integer, job:String, marital : String, edu : String, balance : Integer)\n" +
+ "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
+ " s => Bank(s(0).toInt, \n" +
+ " s(1).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(2).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(3).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
+ " )\n" +
+ ").toDF()");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// spark version
@@ -176,11 +152,11 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
result = interpreter.interpret(
- "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
- "df.show()");
+ "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+ "df.show()");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains(
- "+---+----+\n" +
+ "+---+----+\n" +
"| _1| _2|\n" +
"+---+----+\n" +
"| 1| a|\n" +
@@ -191,11 +167,11 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
result = interpreter.interpret(
- "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
- "df.show()");
+ "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+ "df.show()");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains(
- "+---+----+\n" +
+ "+---+----+\n" +
"| _1| _2|\n" +
"+---+----+\n" +
"| 1| a|\n" +
@@ -203,7 +179,7 @@ public class SparkInterpreterTest {
"+---+----+"));
}
- // ZeppelinContext
+ // submarineContext
result = interpreter.interpret("z.show(df)");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
@@ -213,15 +189,16 @@ public class SparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// getProgress;
- Thread interpretThread = new Thread() {
- @Override
- public void run() {
- InterpreterResult result = null;
- result = interpreter.interpret(
- "val df = sc.parallelize(1 to 10, 5).foreach(e=>Thread.sleep(1000))");
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ Thread interpretThread = new Thread(() -> {
+ try {
+ InterpreterResult result1 = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10, 5).foreach(e=>Thread.sleep(1000))");
+ assertEquals(InterpreterResult.Code.SUCCESS, result1.code());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ fail();
}
- };
+ });
interpretThread.start();
boolean nonZeroProgress = false;
int progress = 0;
@@ -235,16 +212,17 @@ public class SparkInterpreterTest {
}
assertTrue(nonZeroProgress);
- interpretThread = new Thread() {
- @Override
- public void run() {
- InterpreterResult result = null;
- result = interpreter.interpret(
- "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))");
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertTrue(result.message().get(0).getData().contains("cancelled"));
+ interpretThread = new Thread(() -> {
+ try {
+ InterpreterResult result12 = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))");
+ assertEquals(InterpreterResult.Code.ERROR, result12.code());
+ assertTrue(result12.message().get(0).getData().contains("cancelled"));
+ } catch (org.apache.submarine.interpreter.InterpreterException e) {
+ e.printStackTrace();
+ fail();
}
- };
+ });
interpretThread.start();
// sleep 1 second to wait for the spark job start
@@ -254,19 +232,18 @@ public class SparkInterpreterTest {
}
@Test
- public void testDisableReplOutput() {
+ public void testDisableReplOutput() throws InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
- properties.setProperty("zeppelin.spark.maxResult", "100");
- properties.setProperty("zeppelin.spark.test", "true");
- properties.setProperty("zeppelin.spark.printREPLOutput", "false");
+ properties.setProperty("submarine.spark.maxResult", "100");
+ properties.setProperty("submarine.spark.test", "true");
+ properties.setProperty("submarine.spark.printREPLOutput", "false");
// disable color output for easy testing
- properties.setProperty("zeppelin.spark.scala.color", "false");
- properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+ properties.setProperty("submarine.spark.scala.color", "false");
+ properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
- InterpreterContext.set(getInterpreterContext());
- interpreter = new SparkInterpreter();
+ interpreter = new SparkInterpreter(properties);
interpreter.open();
InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
@@ -281,42 +258,41 @@ public class SparkInterpreterTest {
}
@Test
- public void testSchedulePool() {
+ public void testSchedulePool() throws InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
- properties.setProperty("zeppelin.spark.maxResult", "100");
- properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("submarine.spark.maxResult", "100");
+ properties.setProperty("submarine.spark.test", "true");
properties.setProperty("spark.scheduler.mode", "FAIR");
// disable color output for easy testing
- properties.setProperty("zeppelin.spark.scala.color", "false");
- properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+ properties.setProperty("submarine.spark.scala.color", "false");
+ properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
+ properties.setProperty("spark.scheduler.pool", "pool1");
- interpreter = new SparkInterpreter();
- InterpreterContext.set(getInterpreterContext());
+ interpreter = new SparkInterpreter(properties);
+ interpreter.setSchedulerPool("pool1");
interpreter.open();
InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
- // pool is reset to null if user don't specify it via paragraph properties
- result = interpreter.interpret("sc.range(1, 10).sum");
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("pool1", interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
}
// spark.ui.enabled: false
@Test
- public void testDisableSparkUI_1() {
+ public void testDisableSparkUI_1() throws InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
- properties.setProperty("zeppelin.spark.maxResult", "100");
- properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("submarine.spark.maxResult", "100");
+ properties.setProperty("submarine.spark.test", "true");
properties.setProperty("spark.ui.enabled", "false");
// disable color output for easy testing
- properties.setProperty("zeppelin.spark.scala.color", "false");
- properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+ properties.setProperty("submarine.spark.scala.color", "false");
+ properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
- interpreter = new SparkInterpreter();
- InterpreterContext.set(getInterpreterContext());
+ interpreter = new SparkInterpreter(properties);
interpreter.open();
InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
@@ -324,21 +300,20 @@ public class SparkInterpreterTest {
}
- // zeppelin.spark.ui.hidden: true
+ // submarine.spark.ui.hidden: true
@Test
- public void testDisableSparkUI_2() {
+ public void testDisableSparkUI_2() throws InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
- properties.setProperty("zeppelin.spark.maxResult", "100");
- properties.setProperty("zeppelin.spark.test", "true");
- properties.setProperty("zeppelin.spark.ui.hidden", "true");
+ properties.setProperty("submarine.spark.maxResult", "100");
+ properties.setProperty("submarine.spark.test", "true");
+ properties.setProperty("submarine.spark.ui.hidden", "true");
// disable color output for easy testing
- properties.setProperty("zeppelin.spark.scala.color", "false");
- properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+ properties.setProperty("submarine.spark.scala.color", "false");
+ properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
- interpreter = new SparkInterpreter();
- InterpreterContext.set(getInterpreterContext());
+ interpreter = new SparkInterpreter(properties);
interpreter.open();
InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
@@ -347,41 +322,9 @@ public class SparkInterpreterTest {
@After
- public void tearDown() {
+ public void tearDown() throws InterpreterException {
if (this.interpreter != null) {
this.interpreter.close();
}
}
-
- private InterpreterContext getInterpreterContext() {
- output = "";
- InterpreterContext context = InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mockRemoteEventClient)
- .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
- .build();
- context.out = new InterpreterOutput(
- new InterpreterOutputListener() {
- @Override
- public void onUpdateAll(InterpreterOutput out) {
-
- }
-
- @Override
- public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
- try {
- output = out.toInterpreterResultMessage().getData();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onUpdate(int index, InterpreterResultMessageOutput out) {
- messageOutput = out;
- }
- }
- );
- return context;
- }
}
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkSqlInterpreterTest.java b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkSqlInterpreterTest.java
new file mode 100644
index 0000000..ea3d9a1
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkSqlInterpreterTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkSqlInterpreterTest {
+
+ private static SparkSqlInterpreter sqlInterpreter;
+ private static SparkInterpreter sparkInterpreter;
+
+ @BeforeClass
+ public static void setUp() throws InterpreterException {
+ Properties p = new Properties();
+ p.setProperty("spark.master", "local[4]");
+ p.setProperty("spark.app.name", "test");
+ p.setProperty("submarine.spark.maxResult", "10");
+ p.setProperty("submarine.spark.concurrentSQL", "true");
+ p.setProperty("submarine.spark.sql.stacktrace", "true");
+ p.setProperty("submarine.spark.useHiveContext", "true");
+ p.setProperty("submarine.spark.deprecatedMsg.show", "false");
+
+ sqlInterpreter = new SparkSqlInterpreter(p);
+
+ InterpreterGroup intpGroup = new InterpreterGroup();
+ sparkInterpreter = new SparkInterpreter(p);
+ sparkInterpreter.setInterpreterGroup(intpGroup);
+
+ sqlInterpreter = new SparkSqlInterpreter(p);
+ sqlInterpreter.setInterpreterGroup(intpGroup);
+
+ String session = "session_1";
+ intpGroup.put(session, new LinkedList<>());
+ sparkInterpreter.addToSession(session);
+ sqlInterpreter.addToSession(session);
+
+ //SparkInterpreter will change the current thread's classLoader
+ Thread.currentThread().setContextClassLoader(SparkInterpreterTest.class.getClassLoader());
+
+ sparkInterpreter.open();
+ sqlInterpreter.open();
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterpreterException {
+ sqlInterpreter.close();
+ }
+
+ @Test
+ public void test() throws InterpreterException {
+ sparkInterpreter.interpret("case class Test(name:String, age:Int)");
+ sparkInterpreter.interpret(
+ "val test = sc.parallelize(Seq(" +
+ " Test(\"moon\", 33)," +
+ " Test(\"jobs\", 51)," +
+ " Test(\"gates\", 51)," +
+ " Test(\"park\", 34)" +
+ "))");
+ sparkInterpreter.interpret("test.toDF.registerTempTable(\"test\")");
+
+ InterpreterResult ret = sqlInterpreter.interpret("select name, age from test where age < 40");
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(org.apache.submarine.interpreter.InterpreterResult.Type.TABLE,
+ ret.message().get(0).getType()
+ );
+ assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
+
+ ret = sqlInterpreter.interpret("select wrong syntax");
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertTrue(ret.message().get(0).getData().length() > 0);
+
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ sqlInterpreter.interpret("select case when name='aa' then name else name end from test")
+ .code()
+ );
+ }
+
+ @Test
+ public void testStruct() throws InterpreterException {
+ sparkInterpreter.interpret("case class Person(name:String, age:Int)");
+ sparkInterpreter.interpret("case class People(group:String, person:Person)");
+ sparkInterpreter.interpret(
+ "val gr = sc.parallelize(Seq(" +
+ "People(\"g1\", " +
+ "Person(\"moon\",33)), " +
+ "People(\"g2\", " +
+ "Person(\"sun\",11" +
+ "))))");
+ sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")");
+
+ InterpreterResult ret = sqlInterpreter.interpret("select * from gr");
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertTrue(ret.message().get(0).getData().contains("[moon,33]"));
+ assertTrue(ret.message().get(0).getData().contains("[sun,11]"));
+ }
+
+
+ @Test
+ public void testMaxResults() throws InterpreterException {
+ sparkInterpreter.interpret("case class P(age:Int)");
+ sparkInterpreter.interpret(
+ "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))");
+ sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")");
+
+ InterpreterResult ret = sqlInterpreter.interpret("select * from gr");
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ // the number of rows is 10+1, 1 is the head of table
+ assertEquals(11, ret.message().get(0).getData().split("\n").length);
+ assertTrue(ret.message().get(1).getData().contains("alert-warning"));
+
+ // test limit local property
+ sqlInterpreter.setResultLimits(5);
+ ret = sqlInterpreter.interpret("select * from gr");
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ // the number of rows is 5+1, 1 is the head of table
+ assertEquals(6, ret.message().get(0).getData().split("\n").length);
+ }
+
+ @Test
+ public void testConcurrentSQL() throws InterruptedException, InterpreterException {
+
+ sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})");
+
+
+ Thread thread1 = new Thread(() -> {
+ try {
+ InterpreterResult result = sqlInterpreter.interpret("select sleep(10)");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ });
+
+ Thread thread2 = new Thread(() -> {
+ try {
+ InterpreterResult result = sqlInterpreter.interpret("select sleep(10)");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ });
+
+ // start running 2 spark sql, each would sleep 10 seconds, the totally running time should
+ // be less than 20 seconds, which means they run concurrently.
+ long start = System.currentTimeMillis();
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ long end = System.currentTimeMillis();
+ assertTrue("running time must be less than 20 seconds", ((end - start) / 1000) < 20);
+
+ }
+
+ @Test
+ public void testDDL() throws InterpreterException {
+ InterpreterResult ret = sqlInterpreter.interpret("create table t1(id int, name string)");
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ // spark 1.x will still return DataFrame with non-empty columns.
+ // org.apache.spark.sql.DataFrame = [result: string]
+
+ assertTrue(ret.message().isEmpty());
+
+ // create the same table again
+ ret = sqlInterpreter.interpret("create table t1(id int, name string)");
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals(1, ret.message().size());
+ assertEquals(InterpreterResult.Type.TEXT,
+ ret.message().get(0).getType()
+ );
+ assertTrue(ret.message().get(0).getData().contains("already exists"));
+
+ // invalid DDL
+ ret = sqlInterpreter.interpret("create temporary function udf1 as 'org.apache.submarine.UDF'");
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertEquals(1, ret.message().size());
+ assertEquals(InterpreterResult.Type.TEXT,
+ ret.message().get(0).getType()
+ );
+
+ }
+}
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/resources/hive-site.xml b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..e38c7be
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/hive-site.xml
@@ -0,0 +1,7 @@
+<configuration>
+ <property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>${user.home}/hive/warehouse</value>
+ <description>location of default database for the warehouse</description>
+ </property>
+</configuration>
\ No newline at end of file
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/resources/log4j.properties b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d7a8623
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+submarine.log.threshold=ALL
+submarine.root.logger=INFO, console
+
+log4j.rootLogger=${submarine.root.logger}
+
+# Logging Threshold
+log4j.threshold=${submarine.log.threshold}
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
+log4j.appender.console.encoding=UTF-8
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org