You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/26 03:20:08 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5170]. Support
spark-submit in spark interpreter
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 99fe4f7 [ZEPPELIN-5170]. Support spark-submit in spark interpreter
99fe4f7 is described below
commit 99fe4f790b09fce8c097d90e1f3df15492060410
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Dec 22 16:57:11 2020 +0800
[ZEPPELIN-5170]. Support spark-submit in spark interpreter
### What is this PR for?
This PR is to support `%spark.submit` to spark-submit command so that user can run spark jar jobs in zeppelin. This is a sub interpreter of spark. Internally it runs shell interpreter for spark-submit command.
### What type of PR is it?
[Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5170
### How should this be tested?
* Manually tested and integration test is added
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/102894878-b9f08000-449e-11eb-99ce-6eb2cc6ca80d.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3998 from zjffdu/ZEPPELIN-5170 and squashes the following commits:
8c67c6b8d [Jeff Zhang] [ZEPPELIN-5170]. Support spark-submit in spark interpreter
(cherry picked from commit a17c2eade5ecbd22b73f605009ae095ae7c0ecdb)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
spark/interpreter/pom.xml | 6 ++
.../zeppelin/spark/SparkSubmitInterpreter.java | 116 +++++++++++++++++++++
.../src/main/resources/interpreter-setting.json | 13 +++
.../zeppelin/integration/SparkIntegrationTest.java | 7 ++
.../zeppelin/interpreter/InterpreterOutput.java | 33 +++---
.../zeppelin/interpreter/InterpreterResult.java | 2 +-
.../remote/RemoteInterpreterServer.java | 2 +-
.../interpreter/InterpreterOutputTest.java | 4 +-
.../org/apache/zeppelin/server/ZeppelinServer.java | 4 +-
9 files changed, 168 insertions(+), 19 deletions(-)
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 60c8182..da30383 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -112,6 +112,12 @@
<dependency>
<groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-shell</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
<exclusions>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSubmitInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSubmitInterpreter.java
new file mode 100644
index 0000000..d7b26c6
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSubmitInterpreter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.shell.ShellInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Support %spark.submit which run spark-submit command. Internally,
+ * it would run shell command via ShellInterpreter.
+ *
+ */
+public class SparkSubmitInterpreter extends ShellInterpreter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitInterpreter.class);
+
+ private String sparkHome;
+
+ public SparkSubmitInterpreter(Properties property) {
+ super(property);
+ // Set time to be max integer so that the shell process won't timeout.
+ setProperty("shell.command.timeout.millisecs", Integer.MAX_VALUE + "");
+ this.sparkHome = properties.getProperty("SPARK_HOME");
+ LOGGER.info("SPARK_HOME: " + sparkHome);
+ }
+
+ @Override
+ public InterpreterResult internalInterpret(String cmd, InterpreterContext context) {
+ String sparkSubmitCommand = sparkHome + "/bin/spark-submit " + cmd.trim();
+ LOGGER.info("Run spark command: " + sparkSubmitCommand);
+ context.out.addInterpreterOutListener(new SparkSubmitOutputListener(context));
+ return super.internalInterpret(sparkSubmitCommand, context);
+ }
+
+ /**
+ * InterpreterOutputListener which extract spark ui link from logs.
+ */
+ private static class SparkSubmitOutputListener implements InterpreterOutputListener {
+
+ private InterpreterContext context;
+ private boolean isSparkUrlSent = false;
+
+ public SparkSubmitOutputListener(InterpreterContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
+
+ @Override
+ public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+ String text = new String(line);
+ if (isSparkUrlSent) {
+ return;
+ }
+ if (text.contains("tracking URL:")) {
+ // yarn mode, extract yarn proxy url as spark ui link
+ buildSparkUIInfo(text, context);
+ isSparkUrlSent = true;
+ } else if (text.contains("Bound SparkUI to")) {
+ // other mode, extract the spark ui link
+ buildSparkUIInfo(text, context);
+ isSparkUrlSent = true;
+ }
+ }
+
+ private void buildSparkUIInfo(String log, InterpreterContext context) {
+ int pos = log.lastIndexOf(" ");
+ if (pos != -1) {
+ String sparkUI = log.substring(pos + 1);
+ Map<String, String> infos = new java.util.HashMap<String, String>();
+ infos.put("jobUrl", sparkUI);
+ infos.put("label", "Spark UI");
+ infos.put("tooltip", "View in Spark web UI");
+ infos.put("noteId", context.getNoteId());
+ infos.put("paraId", context.getParagraphId());
+ context.getIntpEventClient().onParaInfosReceived(infos);
+ } else {
+ LOGGER.error("Unable to extract spark url from this log: " + log);
+ }
+ }
+
+ @Override
+ public void onUpdate(int index, InterpreterResultMessageOutput out) {
+
+ }
+ }
+}
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index 9d70f9c..8f28823 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -349,5 +349,18 @@
"completionKey": "TAB",
"completionSupport": false
}
+ },
+
+ {
+ "group": "spark",
+ "name": "submit",
+ "className": "org.apache.zeppelin.spark.SparkSubmitInterpreter",
+ "properties": {
+ },
+ "editor": {
+ "language": "sh",
+ "editOnDblClick": false,
+ "completionSupport": false
+ }
}
]
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index b882307..50990ba 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -145,6 +145,13 @@ public abstract class SparkIntegrationTest {
assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(interpreterResult.toString(), InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("eruptions waiting"));
+
+ // test SparkSubmitInterpreter
+ context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+ Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark.submit", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
+ interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context);
+
+ assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
}
@Test
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index e7f28ed..4462635 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -45,7 +46,7 @@ public class InterpreterOutput extends OutputStream {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- private final InterpreterOutputListener flushListener;
+ private final List<InterpreterOutputListener> outputListeners = new ArrayList<>();
private final InterpreterOutputChangeListener changeListener;
private int size = 0;
@@ -58,17 +59,21 @@ public class InterpreterOutput extends OutputStream {
// change static var to set interpreter output limit
// limit will be applied to all InterpreterOutput object.
// so we can expect the consistent behavior
- public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+ public static int LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+
+ public InterpreterOutput() {
+ changeListener = null;
+ }
public InterpreterOutput(InterpreterOutputListener flushListener) {
- this.flushListener = flushListener;
+ this.outputListeners.add(flushListener);
changeListener = null;
}
public InterpreterOutput(InterpreterOutputListener flushListener,
InterpreterOutputChangeListener listener)
throws IOException {
- this.flushListener = flushListener;
+ this.outputListeners.add(flushListener);
this.changeListener = listener;
}
@@ -108,6 +113,10 @@ public class InterpreterOutput extends OutputStream {
}
}
+ public void addInterpreterOutListener(InterpreterOutputListener outputListener) {
+ this.outputListeners.add(outputListener);
+ }
+
public InterpreterResultMessageOutputListener createInterpreterResultMessageOutputListener(
final int index) {
@@ -116,15 +125,15 @@ public class InterpreterOutput extends OutputStream {
@Override
public void onAppend(InterpreterResultMessageOutput out, byte[] line) {
- if (flushListener != null) {
- flushListener.onAppend(idx, out, line);
+ for (InterpreterOutputListener outputListener : outputListeners) {
+ outputListener.onAppend(idx, out, line);
}
}
@Override
public void onUpdate(InterpreterResultMessageOutput out) {
- if (flushListener != null) {
- flushListener.onUpdate(idx, out);
+ for (InterpreterOutputListener outputListener : outputListeners) {
+ outputListener.onUpdate(idx, out);
}
}
};
@@ -184,8 +193,8 @@ public class InterpreterOutput extends OutputStream {
}
private void updateAllResultMessages() {
- if (flushListener != null) {
- flushListener.onUpdateAll(this);
+ for (InterpreterOutputListener outputListener : outputListeners) {
+ outputListener.onUpdateAll(this);
}
}
@@ -206,12 +215,12 @@ public class InterpreterOutput extends OutputStream {
synchronized (resultMessageOutputs) {
currentOut = getCurrentOutput();
- if (++size > limit) {
+ if (++size > LIMIT) {
if (b == NEW_LINE_CHAR && currentOut != null) {
InterpreterResult.Type type = currentOut.getType();
if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
setType(InterpreterResult.Type.HTML);
- getCurrentOutput().write(ResultMessages.getExceedsLimitSizeMessage(limit,
+ getCurrentOutput().write(ResultMessages.getExceedsLimitSizeMessage(LIMIT,
"ZEPPELIN_INTERPRETER_OUTPUT_LIMIT").getData().getBytes());
truncated = true;
return;
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 255b21e..2267399 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -85,7 +85,7 @@ public class InterpreterResult implements Serializable, JsonSerializable {
* @param msg
*/
public void add(String msg) {
- InterpreterOutput out = new InterpreterOutput(null);
+ InterpreterOutput out = new InterpreterOutput();
try {
out.write(msg);
out.flush();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 6b1e5d4..9645843 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -359,7 +359,7 @@ public class RemoteInterpreterServer extends Thread
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
if (properties.containsKey("zeppelin.interpreter.output.limit")) {
- InterpreterOutput.limit = Integer.parseInt(
+ InterpreterOutput.LIMIT = Integer.parseInt(
properties.get("zeppelin.interpreter.output.limit"));
}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
index 8158151..195bfcb 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -166,7 +166,7 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
@Test
public void testTruncate() throws IOException {
// output is truncated after the new line
- InterpreterOutput.limit = 3;
+ InterpreterOutput.LIMIT = 3;
out = new InterpreterOutput(this);
// truncate text
@@ -189,7 +189,7 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
// restore default
- InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+ InterpreterOutput.LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 71ecc09..c3f225e 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -49,8 +49,6 @@ import java.util.Base64;
import java.util.HashSet;
import java.util.List;
import java.util.EnumSet;
-import java.util.Objects;
-import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.management.remote.JMXServiceURL;
@@ -149,7 +147,7 @@ public class ZeppelinServer extends ResourceConfig {
@Inject
public ZeppelinServer() {
- InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
+ InterpreterOutput.LIMIT = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
packages("org.apache.zeppelin.rest");
}