You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/01/14 16:40:21 UTC

zeppelin git commit: ZEPPELIN-1293. Livy Interpreter: Automatically attach or create to a new session

Repository: zeppelin
Updated Branches:
  refs/heads/master 69bc353d3 -> 00742ffdb


ZEPPELIN-1293. Livy Interpreter: Automatically attach or create to a new session

### What is this PR for?
By default, livy session will expire after one hour. This PR would create session automatically for user if session is expired, and would also display the session expire information in frontend. The expire message would only display at the first time of session recreation, after that the message won't be displayed.

### What type of PR is it?
[Improvement ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1293

### How should this be tested?
Tested manually.
![image](https://cloud.githubusercontent.com/assets/164491/21761175/2473c0c0-d68c-11e6-8f39-9e87333c6168.png)

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #1861 from zjffdu/ZEPPELIN-1293 and squashes the following commits:

e174593 [Jeff Zhang] minor update on warning message
30c3569 [Jeff Zhang] address comments
88f0d9a [Jeff Zhang] ZEPPELIN-1293. Livy Interpreter: Automatically attach or create to a new session


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/00742ffd
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/00742ffd
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/00742ffd

Branch: refs/heads/master
Commit: 00742ffdb4cc349ef3ee99dced9e08e2b5a404f6
Parents: 69bc353
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Jan 13 11:30:13 2017 +0800
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Sun Jan 15 01:40:12 2017 +0900

----------------------------------------------------------------------
 .../zeppelin/livy/BaseLivyInterprereter.java    | 74 ++++++++++++++++----
 .../zeppelin/livy/LivySparkSQLInterpreter.java  | 65 +++++++++--------
 2 files changed, 97 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/00742ffd/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index 0c8c8e2..8ed4622 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -21,10 +21,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.apache.zeppelin.interpreter.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpEntity;
@@ -39,6 +36,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Base class for livy interpreters.
@@ -48,10 +46,11 @@ public abstract class BaseLivyInterprereter extends Interpreter {
   protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
   private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();
 
-  protected SessionInfo sessionInfo;
+  protected volatile SessionInfo sessionInfo;
   private String livyURL;
   private long sessionCreationTimeout;
   protected boolean displayAppInfo;
+  private AtomicBoolean sessionExpired = new AtomicBoolean(false);
 
   public BaseLivyInterprereter(Properties property) {
     super(property);
@@ -90,16 +89,17 @@ public abstract class BaseLivyInterprereter extends Interpreter {
         // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
         // explicitly by ourselves.
         sessionInfo.appId = extractStatementResult(
-            interpret("sc.applicationId", false).message()
+            interpret("sc.applicationId", false, false).message()
                 .get(0).getData());
       }
 
       interpret(
-          "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false);
+          "val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
+          false, false);
       if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
         sessionInfo.webUIAddress = extractStatementResult(
             interpret(
-                "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false)
+                "webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false)
                 .message().get(0).getData());
       } else {
         sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
@@ -120,7 +120,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
     }
 
     try {
-      return interpret(st, this.displayAppInfo);
+      return interpret(st, this.displayAppInfo, true);
     } catch (LivyException e) {
       LOGGER.error("Fail to interpret:" + st, e);
       return new InterpreterResult(InterpreterResult.Code.ERROR,
@@ -206,9 +206,26 @@ public abstract class BaseLivyInterprereter extends Interpreter {
     return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
   }
 
-  public InterpreterResult interpret(String code, boolean displayAppInfo)
+  public InterpreterResult interpret(String code, boolean displayAppInfo,
+                                     boolean appendSessionExpired)
       throws LivyException {
-    StatementInfo stmtInfo = executeStatement(new ExecuteRequest(code));
+    StatementInfo stmtInfo = null;
+    boolean sessionExpired = false;
+    try {
+      stmtInfo = executeStatement(new ExecuteRequest(code));
+    } catch (SessionNotFoundException e) {
+      LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
+      sessionExpired = true;
+      // we don't want to create multiple sessions because it is possible to have multiple thread
+      // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
+      // to check session status again in this sync block
+      synchronized (this) {
+        if (isSessionExpired()) {
+          initLivySession();
+        }
+      }
+      stmtInfo = executeStatement(new ExecuteRequest(code));
+    }
     // pull the statement status
     while (!stmtInfo.isAvailable()) {
       try {
@@ -219,7 +236,38 @@ public abstract class BaseLivyInterprereter extends Interpreter {
       }
       stmtInfo = getStatementInfo(stmtInfo.id);
     }
-    return getResultFromStatementInfo(stmtInfo, displayAppInfo);
+    if (appendSessionExpired) {
+      return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
+          sessionExpired);
+    } else {
+      return getResultFromStatementInfo(stmtInfo, displayAppInfo);
+    }
+  }
+
+  private boolean isSessionExpired() throws LivyException {
+    try {
+      getSessionInfo(sessionInfo.id);
+      return false;
+    } catch (SessionNotFoundException e) {
+      return true;
+    } catch (LivyException e) {
+      throw e;
+    }
+  }
+
+  private InterpreterResult appendSessionExpire(InterpreterResult result, boolean sessionExpired) {
+    if (sessionExpired) {
+      InterpreterResult result2 = new InterpreterResult(result.code());
+      result2.add(InterpreterResult.Type.HTML,
+          "<font color=\"red\">Previous livy session is expired, new livy session is created. " +
+              "Paragraphs that depend on this paragraph need to be re-executed!" + "</font>");
+      for (InterpreterResultMessage message : result.message()) {
+        result2.add(message.getType(), message.getData());
+      }
+      return result2;
+    } else {
+      return result;
+    }
   }
 
   private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
@@ -340,7 +388,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
         || response.getStatusCode().value() == 201
         || response.getStatusCode().value() == 404) {
       String responseBody = response.getBody();
-      if (responseBody.matches("Session '\\d+' not found.")) {
+      if (responseBody.matches("\"Session '\\d+' not found.\"")) {
         throw new SessionNotFoundException(responseBody);
       } else {
         return responseBody;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/00742ffd/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index cdc8b5b..0e78860 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -51,7 +51,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
     // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
     // to judge whether it is using spark2.
     try {
-      InterpreterResult result = sparkInterpreter.interpret("spark", false);
+      InterpreterResult result = sparkInterpreter.interpret("spark", false, false);
       if (result.code() == InterpreterResult.Code.SUCCESS &&
           result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
         LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
@@ -59,7 +59,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
         isSpark2 = true;
       } else {
         // spark 1.x
-        result = sparkInterpreter.interpret("sqlContext", false);
+        result = sparkInterpreter.interpret("sqlContext", false, false);
         if (result.code() == InterpreterResult.Code.SUCCESS) {
           LOGGER.info("sqlContext is detected.");
         } else if (result.code() == InterpreterResult.Code.ERROR) {
@@ -68,7 +68,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
           LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
           result = sparkInterpreter.interpret(
               "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
-                  + "import sqlContext.implicits._", false);
+                  + "import sqlContext.implicits._", false, false);
           if (result.code() == InterpreterResult.Code.ERROR) {
             throw new LivyException("Fail to create SQLContext," +
                 result.message().get(0).getData());
@@ -113,37 +113,44 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
       } else {
         sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
       }
-      InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo);
-
-      if (res.code() == InterpreterResult.Code.SUCCESS) {
-        StringBuilder resMsg = new StringBuilder();
-        resMsg.append("%table ");
-        String[] rows = res.message().get(0).getData().split("\n");
-        String[] headers = rows[1].split("\\|");
-        for (int head = 1; head < headers.length; head++) {
-          resMsg.append(headers[head].trim()).append("\t");
-        }
-        resMsg.append("\n");
-        if (rows[3].indexOf("+") == 0) {
-
-        } else {
-          for (int cols = 3; cols < rows.length - 1; cols++) {
-            String[] col = rows[cols].split("\\|");
-            for (int data = 1; data < col.length; data++) {
-              resMsg.append(col[data].trim()).append("\t");
+      InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true);
+
+      if (result.code() == InterpreterResult.Code.SUCCESS) {
+        InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS);
+        for (InterpreterResultMessage message : result.message()) {
+          // convert Text type to Table type. We assume the text type must be the sql output. This
+          // assumption is correct for now. Ideally livy should return table type. We may do it in
+          // the future release of livy.
+          if (message.getType() == InterpreterResult.Type.TEXT) {
+            StringBuilder resMsg = new StringBuilder();
+            String[] rows = message.getData().split("\n");
+            String[] headers = rows[1].split("\\|");
+            for (int head = 1; head < headers.length; head++) {
+              resMsg.append(headers[head].trim()).append("\t");
             }
             resMsg.append("\n");
+            if (rows[3].indexOf("+") == 0) {
+
+            } else {
+              for (int cols = 3; cols < rows.length - 1; cols++) {
+                String[] col = rows[cols].split("\\|");
+                for (int data = 1; data < col.length; data++) {
+                  resMsg.append(col[data].trim()).append("\t");
+                }
+                resMsg.append("\n");
+              }
+            }
+            if (rows[rows.length - 1].indexOf("only") == 0) {
+              resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
+            }
+            result2.add(InterpreterResult.Type.TABLE, resMsg.toString());
+          } else {
+            result2.add(message.getType(), message.getData());
           }
         }
-        if (rows[rows.length - 1].indexOf("only") == 0) {
-          resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
-        }
-
-        return new InterpreterResult(InterpreterResult.Code.SUCCESS,
-            resMsg.toString()
-        );
+        return result2;
       } else {
-        return res;
+        return result;
       }
     } catch (Exception e) {
       LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);