You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/12/26 03:20:08 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5170]. Support spark-submit in spark interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 99fe4f7  [ZEPPELIN-5170]. Support spark-submit in spark interpreter
99fe4f7 is described below

commit 99fe4f790b09fce8c097d90e1f3df15492060410
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Dec 22 16:57:11 2020 +0800

    [ZEPPELIN-5170]. Support spark-submit in spark interpreter
    
    ### What is this PR for?
    
    This PR is to support `%spark.submit` to spark-submit command so that user can run spark jar jobs in zeppelin. This is a sub interpreter of spark. Internally it runs shell interpreter for spark-submit command.
    
    ### What type of PR is it?
    [Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5170
    
    ### How should this be tested?
    * Manually tested and integration test is added
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/102894878-b9f08000-449e-11eb-99ce-6eb2cc6ca80d.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3998 from zjffdu/ZEPPELIN-5170 and squashes the following commits:
    
    8c67c6b8d [Jeff Zhang] [ZEPPELIN-5170]. Support spark-submit in spark interpreter
    
    (cherry picked from commit a17c2eade5ecbd22b73f605009ae095ae7c0ecdb)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 spark/interpreter/pom.xml                          |   6 ++
 .../zeppelin/spark/SparkSubmitInterpreter.java     | 116 +++++++++++++++++++++
 .../src/main/resources/interpreter-setting.json    |  13 +++
 .../zeppelin/integration/SparkIntegrationTest.java |   7 ++
 .../zeppelin/interpreter/InterpreterOutput.java    |  33 +++---
 .../zeppelin/interpreter/InterpreterResult.java    |   2 +-
 .../remote/RemoteInterpreterServer.java            |   2 +-
 .../interpreter/InterpreterOutputTest.java         |   4 +-
 .../org/apache/zeppelin/server/ZeppelinServer.java |   4 +-
 9 files changed, 168 insertions(+), 19 deletions(-)

diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 60c8182..da30383 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -112,6 +112,12 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-shell</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
       <exclusions>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSubmitInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSubmitInterpreter.java
new file mode 100644
index 0000000..d7b26c6
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSubmitInterpreter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.shell.ShellInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Support %spark.submit which run spark-submit command. Internally,
+ * it would run shell command via ShellInterpreter.
+ *
+ */
+public class SparkSubmitInterpreter extends ShellInterpreter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitInterpreter.class);
+
+  private String sparkHome;
+
+  public SparkSubmitInterpreter(Properties property) {
+    super(property);
+    // Set time to be max integer so that the shell process won't timeout.
+    setProperty("shell.command.timeout.millisecs", Integer.MAX_VALUE + "");
+    this.sparkHome = properties.getProperty("SPARK_HOME");
+    LOGGER.info("SPARK_HOME: " + sparkHome);
+  }
+
+  @Override
+  public InterpreterResult internalInterpret(String cmd, InterpreterContext context) {
+    String sparkSubmitCommand = sparkHome + "/bin/spark-submit " + cmd.trim();
+    LOGGER.info("Run spark command: " + sparkSubmitCommand);
+    context.out.addInterpreterOutListener(new SparkSubmitOutputListener(context));
+    return super.internalInterpret(sparkSubmitCommand, context);
+  }
+
+  /**
+   * InterpreterOutputListener which extract spark ui link from logs.
+   */
+  private static class SparkSubmitOutputListener implements InterpreterOutputListener  {
+
+    private InterpreterContext context;
+    private boolean isSparkUrlSent = false;
+
+    public SparkSubmitOutputListener(InterpreterContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public void onUpdateAll(InterpreterOutput out) {
+
+    }
+
+    @Override
+    public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+      String text = new String(line);
+      if (isSparkUrlSent) {
+        return;
+      }
+      if (text.contains("tracking URL:")) {
+        // yarn mode, extract yarn proxy url as spark ui link
+        buildSparkUIInfo(text, context);
+        isSparkUrlSent = true;
+      } else if (text.contains("Bound SparkUI to")) {
+        // other mode, extract the spark ui link
+        buildSparkUIInfo(text, context);
+        isSparkUrlSent = true;
+      }
+    }
+
+    private void buildSparkUIInfo(String log, InterpreterContext context) {
+      int pos = log.lastIndexOf(" ");
+      if (pos != -1) {
+        String sparkUI = log.substring(pos + 1);
+        Map<String, String> infos = new java.util.HashMap<String, String>();
+        infos.put("jobUrl", sparkUI);
+        infos.put("label", "Spark UI");
+        infos.put("tooltip", "View in Spark web UI");
+        infos.put("noteId", context.getNoteId());
+        infos.put("paraId", context.getParagraphId());
+        context.getIntpEventClient().onParaInfosReceived(infos);
+      } else {
+        LOGGER.error("Unable to extract spark url from this log: " + log);
+      }
+    }
+
+    @Override
+    public void onUpdate(int index, InterpreterResultMessageOutput out) {
+
+    }
+  }
+}
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json
index 9d70f9c..8f28823 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -349,5 +349,18 @@
       "completionKey": "TAB",
       "completionSupport": false
     }
+  },
+
+  {
+    "group": "spark",
+    "name": "submit",
+    "className": "org.apache.zeppelin.spark.SparkSubmitInterpreter",
+    "properties": {
+    },
+    "editor": {
+      "language": "sh",
+      "editOnDblClick": false,
+      "completionSupport": false
+    }
   }
 ]
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index b882307..50990ba 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -145,6 +145,13 @@ public abstract class SparkIntegrationTest {
     assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(interpreterResult.toString(), InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
     assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("eruptions waiting"));
+
+    // test SparkSubmitInterpreter
+    context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+    Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark.submit", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("test").createExecutionContext());
+    interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context);
+
+    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
   }
 
   @Test
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
index e7f28ed..4462635 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -45,7 +46,7 @@ public class InterpreterOutput extends OutputStream {
 
   ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 
-  private final InterpreterOutputListener flushListener;
+  private final List<InterpreterOutputListener> outputListeners = new ArrayList<>();
   private final InterpreterOutputChangeListener changeListener;
 
   private int size = 0;
@@ -58,17 +59,21 @@ public class InterpreterOutput extends OutputStream {
   // change static var to set interpreter output limit
   // limit will be applied to all InterpreterOutput object.
   // so we can expect the consistent behavior
-  public static int limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+  public static int LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+
+  public InterpreterOutput() {
+    changeListener = null;
+  }
 
   public InterpreterOutput(InterpreterOutputListener flushListener) {
-    this.flushListener = flushListener;
+    this.outputListeners.add(flushListener);
     changeListener = null;
   }
 
   public InterpreterOutput(InterpreterOutputListener flushListener,
                            InterpreterOutputChangeListener listener)
       throws IOException {
-    this.flushListener = flushListener;
+    this.outputListeners.add(flushListener);
     this.changeListener = listener;
   }
 
@@ -108,6 +113,10 @@ public class InterpreterOutput extends OutputStream {
     }
   }
 
+  public void addInterpreterOutListener(InterpreterOutputListener outputListener) {
+    this.outputListeners.add(outputListener);
+  }
+
   public InterpreterResultMessageOutputListener createInterpreterResultMessageOutputListener(
       final int index) {
 
@@ -116,15 +125,15 @@ public class InterpreterOutput extends OutputStream {
 
       @Override
       public void onAppend(InterpreterResultMessageOutput out, byte[] line) {
-        if (flushListener != null) {
-          flushListener.onAppend(idx, out, line);
+        for (InterpreterOutputListener outputListener : outputListeners) {
+          outputListener.onAppend(idx, out, line);
         }
       }
 
       @Override
       public void onUpdate(InterpreterResultMessageOutput out) {
-        if (flushListener != null) {
-          flushListener.onUpdate(idx, out);
+        for (InterpreterOutputListener outputListener : outputListeners) {
+          outputListener.onUpdate(idx, out);
         }
       }
     };
@@ -184,8 +193,8 @@ public class InterpreterOutput extends OutputStream {
   }
 
   private void updateAllResultMessages() {
-    if (flushListener != null) {
-      flushListener.onUpdateAll(this);
+    for (InterpreterOutputListener outputListener : outputListeners) {
+      outputListener.onUpdateAll(this);
     }
   }
 
@@ -206,12 +215,12 @@ public class InterpreterOutput extends OutputStream {
     synchronized (resultMessageOutputs) {
       currentOut = getCurrentOutput();
 
-      if (++size > limit) {
+      if (++size > LIMIT) {
         if (b == NEW_LINE_CHAR && currentOut != null) {
           InterpreterResult.Type type = currentOut.getType();
           if (type == InterpreterResult.Type.TEXT || type == InterpreterResult.Type.TABLE) {
             setType(InterpreterResult.Type.HTML);
-            getCurrentOutput().write(ResultMessages.getExceedsLimitSizeMessage(limit,
+            getCurrentOutput().write(ResultMessages.getExceedsLimitSizeMessage(LIMIT,
                 "ZEPPELIN_INTERPRETER_OUTPUT_LIMIT").getData().getBytes());
             truncated = true;
             return;
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 255b21e..2267399 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -85,7 +85,7 @@ public class InterpreterResult implements Serializable, JsonSerializable {
    * @param msg
    */
   public void add(String msg) {
-    InterpreterOutput out = new InterpreterOutput(null);
+    InterpreterOutput out = new InterpreterOutput();
     try {
       out.write(msg);
       out.flush();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 6b1e5d4..9645843 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -359,7 +359,7 @@ public class RemoteInterpreterServer extends Thread
 
         String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
         if (properties.containsKey("zeppelin.interpreter.output.limit")) {
-          InterpreterOutput.limit = Integer.parseInt(
+          InterpreterOutput.LIMIT = Integer.parseInt(
                   properties.get("zeppelin.interpreter.output.limit"));
         }
 
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
index 8158151..195bfcb 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java
@@ -166,7 +166,7 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
   @Test
   public void testTruncate() throws IOException {
     // output is truncated after the new line
-    InterpreterOutput.limit = 3;
+    InterpreterOutput.LIMIT = 3;
     out = new InterpreterOutput(this);
 
     // truncate text
@@ -189,7 +189,7 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
     assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
 
     // restore default
-    InterpreterOutput.limit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+    InterpreterOutput.LIMIT = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
   }
 
 
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 71ecc09..c3f225e 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -49,8 +49,6 @@ import java.util.Base64;
 import java.util.HashSet;
 import java.util.List;
 import java.util.EnumSet;
-import java.util.Objects;
-import java.util.stream.Stream;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.management.remote.JMXServiceURL;
@@ -149,7 +147,7 @@ public class ZeppelinServer extends ResourceConfig {
 
   @Inject
   public ZeppelinServer() {
-    InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
+    InterpreterOutput.LIMIT = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
 
     packages("org.apache.zeppelin.rest");
   }