You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pr...@apache.org on 2016/10/13 08:38:14 UTC

zeppelin git commit: ZEPPELIN-1430. Display appId and webui link in LivyInterpreter's output

Repository: zeppelin
Updated Branches:
  refs/heads/master 22bd85104 -> 5d5d758ce


ZEPPELIN-1430. Display appId and webui link in LivyInterpreter's output

### What is this PR for?
For now, it is hard to figure out what the yarn application of the livy session represent, it would be better to display the appId and webui link in the output of LivyInterpreter for diagnosing purpose. It can also be applied to the native SparkInterpreter, but could be done in another ticket.

### What type of PR is it?
[Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1430

### How should this be tested?
Tested manually

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18463333/e4eab580-79bb-11e6-8c8d-393ab6805638.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #1426 from zjffdu/ZEPPELIN-1430 and squashes the following commits:

88009cb [Jeff Zhang] update doc
459f75e [Jeff Zhang] fix typo
eb7ec27 [Jeff Zhang] add flag to enable display app info in frontend
a087a1d [Jeff Zhang] ZEPPELIN-1430. Display appId and webui link in LivyInterpreter's output


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5d5d758c
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5d5d758c
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5d5d758c

Branch: refs/heads/master
Commit: 5d5d758ce6ad1320c536ec7caddc9acc36876990
Parents: 22bd851
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Oct 11 15:48:40 2016 +0800
Committer: Prabhjyot Singh <pr...@apache.org>
Committed: Thu Oct 13 14:07:49 2016 +0530

----------------------------------------------------------------------
 docs/interpreter/livy.md                        |  7 ++-
 .../org/apache/zeppelin/livy/LivyHelper.java    | 44 ++++++++-------
 .../apache/zeppelin/livy/LivyOutputStream.java  |  9 ++++
 .../zeppelin/livy/LivySparkInterpreter.java     | 56 +++++++++++++++++---
 .../src/main/resources/interpreter-setting.json |  5 ++
 5 files changed, 93 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d5d758c/docs/interpreter/livy.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md
index a0db622..d725304 100644
--- a/docs/interpreter/livy.md
+++ b/docs/interpreter/livy.md
@@ -65,7 +65,12 @@ Example: `spark.master` to `livy.spark.master`
     <td>1000</td>
     <td>Max number of Spark SQL result to display.</td>
   </tr>
-    <tr>
+  <tr>
+    <td>zeppelin.livy.displayAppInfo</td>
+    <td>false</td>
+    <td>Whether to display app info</td>
+  </tr>
+  <tr>
     <td>livy.spark.driver.cores</td>
     <td></td>
     <td>Driver cores. ex) 1, 2.</td>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d5d758c/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
index 0b09e28..ab0b499 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.livy;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -133,21 +134,23 @@ public class LivyHelper {
   public InterpreterResult interpretInput(String stringLines,
                                           final InterpreterContext context,
                                           final Map<String, Integer> userSessionMap,
-                                          LivyOutputStream out) {
+                                          LivyOutputStream out,
+                                          String appId,
+                                          String webUI,
+                                          boolean displayAppInfo) {
     try {
+      out.setInterpreterOutput(context.out);
+      context.out.clear();
+      String incomplete = "";
+      boolean inComment = false;
       String[] lines = stringLines.split("\n");
       String[] linesToRun = new String[lines.length + 1];
       for (int i = 0; i < lines.length; i++) {
         linesToRun[i] = lines[i];
       }
       linesToRun[lines.length] = "print(\"\")";
-
-      out.setInterpreterOutput(context.out);
-      context.out.clear();
       Code r = null;
-      String incomplete = "";
-      boolean inComment = false;
-
+      StringBuilder outputBuilder = new StringBuilder();
       for (int l = 0; l < linesToRun.length; l++) {
         String s = linesToRun[l];
         // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
@@ -196,7 +199,7 @@ public class LivyHelper {
         } else if (r == Code.INCOMPLETE) {
           incomplete += s + "\n";
         } else {
-          out.write((res.message() + "\n").getBytes(Charset.forName("UTF-8")));
+          outputBuilder.append(res.message() + "\n");
           incomplete = "";
         }
       }
@@ -205,10 +208,20 @@ public class LivyHelper {
         out.setInterpreterOutput(null);
         return new InterpreterResult(r, "Incomplete expression");
       } else {
+        if (displayAppInfo) {
+          out.write("%angular ");
+          out.write("<pre><code>");
+          out.write(outputBuilder.toString());
+          out.write("</code></pre>");
+          out.write("<hr/>");
+          out.write("Spark Application Id:" + appId + "<br/>");
+          out.write("Spark WebUI: <a href=" + webUI + ">" + webUI + "</a>");
+        } else {
+          out.write(outputBuilder.toString());
+        }
         out.setInterpreterOutput(null);
         return new InterpreterResult(Code.SUCCESS);
       }
-
     } catch (Exception e) {
       LOGGER.error("error in interpretInput", e);
       return new InterpreterResult(Code.ERROR, e.getMessage());
@@ -219,16 +232,6 @@ public class LivyHelper {
                                      final InterpreterContext context,
                                      final Map<String, Integer> userSessionMap)
       throws Exception {
-    stringLines = stringLines
-        //for "\n" present in string
-        .replaceAll("\\\\n", "\\\\\\\\n")
-        //for new line present in string
-        .replaceAll("\\n", "\\\\n")
-        // for \" present in string
-        .replaceAll("\\\\\"", "\\\\\\\\\"")
-        // for " present in string
-        .replaceAll("\"", "\\\\\"");
-
     if (stringLines.trim().equals("")) {
       return new InterpreterResult(Code.SUCCESS, "");
     }
@@ -295,7 +298,7 @@ public class LivyHelper {
             + userSessionMap.get(context.getAuthenticationInfo().getUser())
             + "/statements",
         "POST",
-        "{\"code\": \"" + lines + "\" }",
+        "{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
         context.getParagraphId());
     if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) {
       throw new Exception("Exception: Session not found, Livy server would have restarted, " +
@@ -340,6 +343,7 @@ public class LivyHelper {
 
   protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
       throws Exception {
+    LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
     RestTemplate restTemplate = getRestTemplate();
     HttpHeaders headers = new HttpHeaders();
     headers.add("Content-Type", "application/json");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d5d758c/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
index 6a525eb..a11634e 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyOutputStream.java
@@ -17,6 +17,8 @@
 package org.apache.zeppelin.livy;
 
 import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -25,6 +27,8 @@ import java.io.OutputStream;
  * InterpreterOutput can be attached / detached.
  */
 public class LivyOutputStream extends OutputStream {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(LivyOutputStream.class);
   InterpreterOutput interpreterOutput;
 
   public LivyOutputStream() {
@@ -52,6 +56,11 @@ public class LivyOutputStream extends OutputStream {
     }
   }
 
+  public void write(String text) throws IOException {
+    LOGGER.debug("livy output:" + text);
+    write(text.getBytes("UTF-8"));
+  }
+
   @Override
   public void write(byte[] b, int offset, int len) throws IOException {
     if (interpreterOutput != null) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d5d758c/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
index 95ee22e..9a9dd80 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
@@ -38,13 +38,20 @@ public class LivySparkInterpreter extends Interpreter {
   private LivyOutputStream out;
 
   protected static Map<String, Integer> userSessionMap;
+  protected static Map<Integer, String> sessionId2AppIdMap;
+  protected static Map<Integer, String> sessionId2WebUIMap;
+
   private LivyHelper livyHelper;
+  private boolean displayAppInfo;
 
   public LivySparkInterpreter(Properties property) {
     super(property);
     userSessionMap = new HashMap<>();
+    sessionId2AppIdMap = new HashMap<>();
+    sessionId2WebUIMap = new HashMap<>();
     livyHelper = new LivyHelper(property);
     out = new LivyOutputStream();
+    this.displayAppInfo = Boolean.parseBoolean(getProperty("zeppelin.livy.displayAppInfo"));
   }
 
   protected static Map<String, Integer> getUserSessionMap() {
@@ -67,24 +74,42 @@ public class LivySparkInterpreter extends Interpreter {
   @Override
   public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
     try {
+      Integer sessionId = null;
       if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
         try {
-          userSessionMap.put(
-              interpreterContext.getAuthenticationInfo().getUser(),
-              livyHelper.createSession(
-                  interpreterContext,
-                  "spark")
-          );
+          sessionId = livyHelper.createSession(interpreterContext, "spark");
+          userSessionMap.put(interpreterContext.getAuthenticationInfo().getUser(), sessionId);
+          if (displayAppInfo) {
+            String appId = extractStatementResult(
+                    livyHelper.interpret("sc.applicationId", interpreterContext, userSessionMap)
+                            .message());
+            livyHelper.interpret(
+                    "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
+                    interpreterContext, userSessionMap);
+            String webUI = extractStatementResult(
+                    livyHelper.interpret(
+                            "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
+                            interpreterContext, userSessionMap).message());
+            sessionId2AppIdMap.put(sessionId, appId);
+            sessionId2WebUIMap.put(sessionId, webUI);
+            LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
+                    sessionId, appId, webUI);
+          } else {
+            LOGGER.info("Create livy session with sessionId: {}", sessionId);
+          }
         } catch (Exception e) {
           LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
           return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
         }
+      } else {
+        sessionId = userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser());
       }
       if (line == null || line.trim().length() == 0) {
         return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
       }
 
-      return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out);
+      return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out,
+              sessionId2AppIdMap.get(sessionId), sessionId2WebUIMap.get(sessionId), displayAppInfo);
     } catch (Exception e) {
       LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
       return new InterpreterResult(InterpreterResult.Code.ERROR,
@@ -92,6 +117,23 @@ public class LivySparkInterpreter extends Interpreter {
     }
   }
 
+  /**
+   * Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
+   * from following:
+   * res0: String = application_1473129941656_0048
+   * @param result
+   * @return
+   */
+  private static String extractStatementResult(String result) {
+    int pos = -1;
+    if ((pos = result.indexOf("=")) >= 0) {
+      return result.substring(pos + 1).trim();
+    } else {
+      throw new RuntimeException("No result can be extracted from '" + result + "', " +
+              "something must be wrong");
+    }
+  }
+
   @Override
   public void cancel(InterpreterContext context) {
     livyHelper.cancelHTTP(context.getParagraphId());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d5d758c/livy/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json
index bad6830..4f2cf3d 100644
--- a/livy/src/main/resources/interpreter-setting.json
+++ b/livy/src/main/resources/interpreter-setting.json
@@ -86,6 +86,11 @@
         "propertyName": "livy.spark.jars.packages",
         "defaultValue": "",
         "description": "Adding extra libraries to livy interpreter"
+      },
+      "livy.spark.displayAppInfo": {
+        "propertyName": "zeppelin.livy.displayAppInfo",
+        "defaultValue": "false",
+        "description": "Whether display app info"
       }
     },
     "editor": {