You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/05/02 01:09:43 UTC

zeppelin git commit: [ZEPPELIN-2465] Minor code fixes for the livy package

Repository: zeppelin
Updated Branches:
  refs/heads/master 1ff545321 -> 55cb6b894


[ZEPPELIN-2465] Minor code fixes for the livy package

### What is this PR for?
Minor code fixes for the livy package.
The code fixes include :
Fixing a typo in a classname - BaseLivyInterprereter to BaseLivyInterpreter
Removing an unused variable in BaseLivyInterpreter
Removing unused imports in a few classes

### What type of PR is it?
Refactoring

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2465

### How should this be tested?
No need to test as there is no change in funcionality

### Questions:
* Does the licenses files need update? NO
* Is there breaking changes for older versions? NO
* Does this needs documentation? NO

Author: Benoy Antony <be...@apache.org>

Closes #2297 from benoyantony/refactor and squashes the following commits:

bd6d8ff [Benoy Antony] ZEPPELIN-2465 Minor code fixes for the livy package


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/55cb6b89
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/55cb6b89
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/55cb6b89

Branch: refs/heads/master
Commit: 55cb6b894b1a62e4e941e63abaad834d15ca733e
Parents: 1ff5453
Author: Benoy Antony <be...@apache.org>
Authored: Thu Apr 27 23:51:48 2017 -0700
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue May 2 09:09:30 2017 +0800

----------------------------------------------------------------------
 .../zeppelin/livy/BaseLivyInterprereter.java    | 682 -------------------
 .../zeppelin/livy/BaseLivyInterpreter.java      | 680 ++++++++++++++++++
 .../zeppelin/livy/LivyPySpark3Interpreter.java  |  10 -
 .../livy/LivyPySparkBaseInterpreter.java        |   2 +-
 .../zeppelin/livy/LivyPySparkInterpreter.java   |  10 -
 .../zeppelin/livy/LivySparkInterpreter.java     |  13 +-
 .../zeppelin/livy/LivySparkRInterpreter.java    |  12 +-
 .../zeppelin/livy/LivySparkSQLInterpreter.java  |   2 +-
 .../apache/zeppelin/livy/LivyInterpreterIT.java |   2 +-
 9 files changed, 685 insertions(+), 728 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
deleted file mode 100644
index 77c98d9..0000000
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ /dev/null
@@ -1,682 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.livy;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.annotations.SerializedName;
-import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.conn.ssl.SSLContexts;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.zeppelin.interpreter.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
-import org.springframework.security.kerberos.client.KerberosRestTemplate;
-import org.springframework.web.client.HttpClientErrorException;
-import org.springframework.web.client.RestClientException;
-import org.springframework.web.client.RestTemplate;
-
-import javax.net.ssl.SSLContext;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.KeyStore;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-
-/**
- * Base class for livy interpreters.
- */
-public abstract class BaseLivyInterprereter extends Interpreter {
-
-  protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
-  private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();
-  private static String SESSION_NOT_FOUND_PATTERN = "\"Session '\\d+' not found.\"";
-
-  protected volatile SessionInfo sessionInfo;
-  private String livyURL;
-  private int sessionCreationTimeout;
-  private int pullStatusInterval;
-  protected boolean displayAppInfo;
-  private AtomicBoolean sessionExpired = new AtomicBoolean(false);
-  protected LivyVersion livyVersion;
-  private RestTemplate restTemplate;
-
-  Set<Object> paragraphsToCancel = Collections.newSetFromMap(
-      new ConcurrentHashMap<Object, Boolean>());
-  private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
-      new ConcurrentHashMap<>();
-
-  public BaseLivyInterprereter(Properties property) {
-    super(property);
-    this.livyURL = property.getProperty("zeppelin.livy.url");
-    this.displayAppInfo = Boolean.parseBoolean(
-        property.getProperty("zeppelin.livy.displayAppInfo", "false"));
-    this.sessionCreationTimeout = Integer.parseInt(
-        property.getProperty("zeppelin.livy.session.create_timeout", 120 + ""));
-    this.pullStatusInterval = Integer.parseInt(
-        property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + ""));
-    this.restTemplate = createRestTemplate();
-  }
-
-  public abstract String getSessionKind();
-
-  @Override
-  public void open() {
-    try {
-      initLivySession();
-    } catch (LivyException e) {
-      String msg = "Fail to create session, please check livy interpreter log and " +
-          "livy server log";
-      throw new RuntimeException(msg, e);
-    }
-  }
-
-  @Override
-  public void close() {
-    if (sessionInfo != null) {
-      closeSession(sessionInfo.id);
-      // reset sessionInfo to null so that we won't close it twice.
-      sessionInfo = null;
-    }
-  }
-
-  protected void initLivySession() throws LivyException {
-    this.sessionInfo = createSession(getUserName(), getSessionKind());
-    if (displayAppInfo) {
-      if (sessionInfo.appId == null) {
-        // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
-        // explicitly by ourselves.
-        sessionInfo.appId = extractAppId();
-      }
-
-      if (sessionInfo.appInfo == null ||
-          StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
-        sessionInfo.webUIAddress = extractWebUIAddress();
-      } else {
-        sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
-      }
-      LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}",
-          sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress);
-    } else {
-      LOGGER.info("Create livy session successfully with sessionId: {}", this.sessionInfo.id);
-    }
-    // check livy version
-    try {
-      this.livyVersion = getLivyVersion();
-      LOGGER.info("Use livy " + livyVersion);
-    } catch (APINotFoundException e) {
-      this.livyVersion = new LivyVersion("0.2.0");
-      LOGGER.info("Use livy 0.2.0");
-    }
-  }
-
-  protected abstract String extractAppId() throws LivyException;
-
-  protected abstract String extractWebUIAddress() throws LivyException;
-
-  public SessionInfo getSessionInfo() {
-    return sessionInfo;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    if (StringUtils.isEmpty(st)) {
-      return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
-    }
-
-    try {
-      return interpret(st, context.getParagraphId(), this.displayAppInfo, true);
-    } catch (LivyException e) {
-      LOGGER.error("Fail to interpret:" + st, e);
-      return new InterpreterResult(InterpreterResult.Code.ERROR,
-          InterpreterUtils.getMostRelevantMessage(e));
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    paragraphsToCancel.add(context.getParagraphId());
-    LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation.");
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.SIMPLE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    if (livyVersion.isGetProgressSupported()) {
-      String paraId = context.getParagraphId();
-      Integer progress = paragraphId2StmtProgressMap.get(paraId);
-      return progress == null ? 0 : progress;
-    }
-    return 0;
-  }
-
-  private SessionInfo createSession(String user, String kind)
-      throws LivyException {
-    try {
-      Map<String, String> conf = new HashMap<>();
-      for (Map.Entry<Object, Object> entry : property.entrySet()) {
-        if (entry.getKey().toString().startsWith("livy.spark.") &&
-            !entry.getValue().toString().isEmpty())
-          conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
-      }
-
-      CreateSessionRequest request = new CreateSessionRequest(kind,
-          user == null || user.equals("anonymous") ? null : user, conf);
-      SessionInfo sessionInfo = SessionInfo.fromJson(
-          callRestAPI("/sessions", "POST", request.toJson()));
-      long start = System.currentTimeMillis();
-      // pull the session status until it is idle or timeout
-      while (!sessionInfo.isReady()) {
-        if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
-          String msg = "The creation of session " + sessionInfo.id + " is timeout within "
-              + sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
-              + ", log: " + sessionInfo.log;
-          throw new LivyException(msg);
-        }
-        Thread.sleep(pullStatusInterval);
-        sessionInfo = getSessionInfo(sessionInfo.id);
-        LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
-            sessionInfo.appId);
-        if (sessionInfo.isFinished()) {
-          String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
-              + ", log: " + sessionInfo.log;
-          throw new LivyException(msg);
-        }
-      }
-      return sessionInfo;
-    } catch (Exception e) {
-      LOGGER.error("Error when creating livy session for user " + user, e);
-      throw new LivyException(e);
-    }
-  }
-
-  private SessionInfo getSessionInfo(int sessionId) throws LivyException {
-    return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
-  }
-
-  public InterpreterResult interpret(String code,
-                                     String paragraphId,
-                                     boolean displayAppInfo,
-                                     boolean appendSessionExpired) throws LivyException {
-    StatementInfo stmtInfo = null;
-    boolean sessionExpired = false;
-    try {
-      try {
-        stmtInfo = executeStatement(new ExecuteRequest(code));
-      } catch (SessionNotFoundException e) {
-        LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
-        sessionExpired = true;
-        // we don't want to create multiple sessions because it is possible to have multiple thread
-        // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
-        // to check session status again in this sync block
-        synchronized (this) {
-          if (isSessionExpired()) {
-            initLivySession();
-          }
-        }
-        stmtInfo = executeStatement(new ExecuteRequest(code));
-      }
-      // pull the statement status
-      while (!stmtInfo.isAvailable()) {
-        if (paragraphId != null && paragraphsToCancel.contains(paragraphId)) {
-          cancel(stmtInfo.id, paragraphId);
-          return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
-        }
-        try {
-          Thread.sleep(pullStatusInterval);
-        } catch (InterruptedException e) {
-          LOGGER.error("InterruptedException when pulling statement status.", e);
-          throw new LivyException(e);
-        }
-        stmtInfo = getStatementInfo(stmtInfo.id);
-        if (paragraphId != null) {
-          paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
-        }
-      }
-      if (appendSessionExpired) {
-        return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
-            sessionExpired);
-      } else {
-        return getResultFromStatementInfo(stmtInfo, displayAppInfo);
-      }
-    } finally {
-      if (paragraphId != null) {
-        paragraphId2StmtProgressMap.remove(paragraphId);
-        paragraphsToCancel.remove(paragraphId);
-      }
-    }
-  }
-
-  private void cancel(int id, String paragraphId) {
-    if (livyVersion.isCancelSupported()) {
-      try {
-        LOGGER.info("Cancelling statement " + id);
-        cancelStatement(id);
-      } catch (LivyException e) {
-        LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
-      }
-      finally {
-        paragraphsToCancel.remove(paragraphId);
-      }
-    } else {
-      LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
-      paragraphsToCancel.clear();
-    }
-  }
-
-  protected LivyVersion getLivyVersion() throws LivyException {
-    return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version));
-  }
-
-  private boolean isSessionExpired() throws LivyException {
-    try {
-      getSessionInfo(sessionInfo.id);
-      return false;
-    } catch (SessionNotFoundException e) {
-      return true;
-    } catch (LivyException e) {
-      throw e;
-    }
-  }
-
-  private InterpreterResult appendSessionExpire(InterpreterResult result, boolean sessionExpired) {
-    if (sessionExpired) {
-      InterpreterResult result2 = new InterpreterResult(result.code());
-      result2.add(InterpreterResult.Type.HTML,
-          "<font color=\"red\">Previous livy session is expired, new livy session is created. " +
-              "Paragraphs that depend on this paragraph need to be re-executed!" + "</font>");
-      for (InterpreterResultMessage message : result.message()) {
-        result2.add(message.getType(), message.getData());
-      }
-      return result2;
-    } else {
-      return result;
-    }
-  }
-
-
-  private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
-                                                       boolean displayAppInfo) {
-    if (stmtInfo.output != null && stmtInfo.output.isError()) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, stmtInfo.output.evalue);
-    } else if (stmtInfo.isCancelled()) {
-      // corner case, output might be null if it is cancelled.
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
-    } else if (stmtInfo.output == null) {
-      // This case should never happen, just in case
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Empty output");
-    } else {
-      //TODO(zjffdu) support other types of data (like json, image and etc)
-      String result = stmtInfo.output.data.plain_text;
-
-      // check table magic result first
-      if (stmtInfo.output.data.application_livy_table_json != null) {
-        StringBuilder outputBuilder = new StringBuilder();
-        boolean notFirstColumn = false;
-
-        for (Map header : stmtInfo.output.data.application_livy_table_json.headers) {
-          if (notFirstColumn) {
-            outputBuilder.append("\t");
-          }
-          outputBuilder.append(header.get("name"));
-          notFirstColumn = true;
-        }
-
-        outputBuilder.append("\n");
-        for (List<Object> row : stmtInfo.output.data.application_livy_table_json.records) {
-          outputBuilder.append(StringUtils.join(row, "\t"));
-          outputBuilder.append("\n");
-        }
-        return new InterpreterResult(InterpreterResult.Code.SUCCESS,
-            InterpreterResult.Type.TABLE, outputBuilder.toString());
-      } else if (stmtInfo.output.data.image_png != null) {
-        return new InterpreterResult(InterpreterResult.Code.SUCCESS,
-            InterpreterResult.Type.IMG, (String) stmtInfo.output.data.image_png);
-      } else if (result != null) {
-        result = result.trim();
-        if (result.startsWith("<link")
-            || result.startsWith("<script")
-            || result.startsWith("<style")
-            || result.startsWith("<div")) {
-          result = "%html " + result;
-        }
-      }
-
-      if (displayAppInfo) {
-        InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
-        interpreterResult.add(result);
-        String appInfoHtml = "<hr/>Spark Application Id: " + sessionInfo.appId + "<br/>"
-            + "Spark WebUI: <a href=\"" + sessionInfo.webUIAddress + "\">"
-            + sessionInfo.webUIAddress + "</a>";
-        interpreterResult.add(InterpreterResult.Type.HTML, appInfoHtml);
-        return interpreterResult;
-      } else {
-        return new InterpreterResult(InterpreterResult.Code.SUCCESS, result);
-      }
-    }
-  }
-
-  private StatementInfo executeStatement(ExecuteRequest executeRequest)
-      throws LivyException {
-    return StatementInfo.fromJson(callRestAPI("/sessions/" + sessionInfo.id + "/statements", "POST",
-        executeRequest.toJson()));
-  }
-
-  private StatementInfo getStatementInfo(int statementId)
-      throws LivyException {
-    return StatementInfo.fromJson(
-        callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET"));
-  }
-
-  private void cancelStatement(int statementId) throws LivyException {
-    callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId + "/cancel", "POST");
-  }
-
-
-  private RestTemplate createRestTemplate() {
-    HttpClient httpClient = null;
-    if (livyURL.startsWith("https:")) {
-      String keystoreFile = property.getProperty("zeppelin.livy.ssl.trustStore");
-      String password = property.getProperty("zeppelin.livy.ssl.trustStorePassword");
-      if (StringUtils.isBlank(keystoreFile)) {
-        throw new RuntimeException("No zeppelin.livy.ssl.trustStore specified for livy ssl");
-      }
-      if (StringUtils.isBlank(password)) {
-        throw new RuntimeException("No zeppelin.livy.ssl.trustStorePassword specified " +
-            "for livy ssl");
-      }
-      FileInputStream inputStream = null;
-      try {
-        inputStream = new FileInputStream(keystoreFile);
-        KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
-        trustStore.load(new FileInputStream(keystoreFile), password.toCharArray());
-        SSLContext sslContext = SSLContexts.custom()
-            .loadTrustMaterial(trustStore)
-            .build();
-        SSLConnectionSocketFactory csf = new SSLConnectionSocketFactory(sslContext);
-        httpClient = HttpClients.custom().setSSLSocketFactory(csf).build();
-      } catch (Exception e) {
-        throw new RuntimeException("Failed to create SSL HttpClient", e);
-      } finally {
-        if (inputStream != null) {
-          try {
-            inputStream.close();
-          } catch (IOException e) {
-            LOGGER.error("Failed to close keystore file", e);
-          }
-        }
-      }
-    }
-
-    String keytabLocation = property.getProperty("zeppelin.livy.keytab");
-    String principal = property.getProperty("zeppelin.livy.principal");
-    if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
-      if (httpClient == null) {
-        return new KerberosRestTemplate(keytabLocation, principal);
-      } else {
-        return new KerberosRestTemplate(keytabLocation, principal, httpClient);
-      }
-    }
-    if (httpClient == null) {
-      return new RestTemplate();
-    } else {
-      return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient));
-    }
-  }
-
-  private String callRestAPI(String targetURL, String method) throws LivyException {
-    return callRestAPI(targetURL, method, "");
-  }
-
-  private String callRestAPI(String targetURL, String method, String jsonData)
-      throws LivyException {
-    targetURL = livyURL + targetURL;
-    LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
-    HttpHeaders headers = new HttpHeaders();
-    headers.add("Content-Type", "application/json");
-    headers.add("X-Requested-By", "zeppelin");
-    ResponseEntity<String> response = null;
-    try {
-      if (method.equals("POST")) {
-        HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
-        response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
-      } else if (method.equals("GET")) {
-        HttpEntity<String> entity = new HttpEntity<>(headers);
-        response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
-      } else if (method.equals("DELETE")) {
-        HttpEntity<String> entity = new HttpEntity<>(headers);
-        response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
-      }
-    } catch (HttpClientErrorException e) {
-      response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
-      LOGGER.error(String.format("Error with %s StatusCode: %s",
-          response.getStatusCode().value(), e.getResponseBodyAsString()));
-    } catch (RestClientException e) {
-      // Exception happens when kerberos is enabled.
-      if (e.getCause() instanceof HttpClientErrorException) {
-        HttpClientErrorException cause = (HttpClientErrorException) e.getCause();
-        if (cause.getResponseBodyAsString().matches(SESSION_NOT_FOUND_PATTERN)) {
-          throw new SessionNotFoundException(cause.getResponseBodyAsString());
-        }
-        throw new LivyException(cause.getResponseBodyAsString() + "\n"
-            + ExceptionUtils.getFullStackTrace(ExceptionUtils.getRootCause(e)));
-      }
-      throw new LivyException(e);
-    }
-    if (response == null) {
-      throw new LivyException("No http response returned");
-    }
-    LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(),
-        response.getBody());
-    if (response.getStatusCode().value() == 200
-        || response.getStatusCode().value() == 201) {
-      return response.getBody();
-    } else if (response.getStatusCode().value() == 404) {
-      if (response.getBody().matches(SESSION_NOT_FOUND_PATTERN)) {
-        throw new SessionNotFoundException(response.getBody());
-      } else {
-        throw new APINotFoundException("No rest api found for " + targetURL +
-            ", " + response.getStatusCode());
-      }
-    } else {
-      String responseString = response.getBody();
-      if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
-        return responseString;
-      }
-      throw new LivyException(String.format("Error with %s StatusCode: %s",
-          response.getStatusCode().value(), responseString));
-    }
-  }
-
-  private void closeSession(int sessionId) {
-    try {
-      callRestAPI("/sessions/" + sessionId, "DELETE");
-    } catch (Exception e) {
-      LOGGER.error(String.format("Error closing session for user with session ID: %s",
-          sessionId), e);
-    }
-  }
-
-  /*
-  * We create these POJO here to accommodate livy 0.3 which is not released yet. livy rest api has
-  * some changes from version to version. So we create these POJO in zeppelin side to accommodate
-  * incompatibility between versions. Later, when livy become more stable, we could just depend on
-  * livy client jar.
-  */
-  private static class CreateSessionRequest {
-    public final String kind;
-    @SerializedName("proxyUser")
-    public final String user;
-    public final Map<String, String> conf;
-
-    public CreateSessionRequest(String kind, String user, Map<String, String> conf) {
-      this.kind = kind;
-      this.user = user;
-      this.conf = conf;
-    }
-
-    public String toJson() {
-      return gson.toJson(this);
-    }
-  }
-
-  /**
-   *
-   */
-  public static class SessionInfo {
-
-    public final int id;
-    public String appId;
-    public String webUIAddress;
-    public final String owner;
-    public final String proxyUser;
-    public final String state;
-    public final String kind;
-    public final Map<String, String> appInfo;
-    public final List<String> log;
-
-    public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
-                       String kind, Map<String, String> appInfo, List<String> log) {
-      this.id = id;
-      this.appId = appId;
-      this.owner = owner;
-      this.proxyUser = proxyUser;
-      this.state = state;
-      this.kind = kind;
-      this.appInfo = appInfo;
-      this.log = log;
-    }
-
-    public boolean isReady() {
-      return state.equals("idle");
-    }
-
-    public boolean isFinished() {
-      return state.equals("error") || state.equals("dead") || state.equals("success");
-    }
-
-    public static SessionInfo fromJson(String json) {
-      return gson.fromJson(json, SessionInfo.class);
-    }
-  }
-
-  private static class ExecuteRequest {
-    public final String code;
-
-    public ExecuteRequest(String code) {
-      this.code = code;
-    }
-
-    public String toJson() {
-      return gson.toJson(this);
-    }
-  }
-
-  private static class StatementInfo {
-    public Integer id;
-    public String state;
-    public double progress;
-    public StatementOutput output;
-
-    public StatementInfo() {
-    }
-
-    public static StatementInfo fromJson(String json) {
-      return gson.fromJson(json, StatementInfo.class);
-    }
-
-    public boolean isAvailable() {
-      return state.equals("available") || state.equals("cancelled");
-    }
-
-    public boolean isCancelled() {
-      return state.equals("cancelled");
-    }
-
-    private static class StatementOutput {
-      public String status;
-      public String execution_count;
-      public Data data;
-      public String ename;
-      public String evalue;
-      public Object traceback;
-      public TableMagic tableMagic;
-
-      public boolean isError() {
-        return status.equals("error");
-      }
-
-      public String toJson() {
-        return gson.toJson(this);
-      }
-
-      private static class Data {
-        @SerializedName("text/plain")
-        public String plain_text;
-        @SerializedName("image/png")
-        public String image_png;
-        @SerializedName("application/json")
-        public String application_json;
-        @SerializedName("application/vnd.livy.table.v1+json")
-        public TableMagic application_livy_table_json;
-      }
-
-      private static class TableMagic {
-        @SerializedName("headers")
-        List<Map> headers;
-
-        @SerializedName("data")
-        List<List> records;
-      }
-    }
-  }
-
-  private static class LivyVersionResponse {
-    public String url;
-    public String branch;
-    public String revision;
-    public String version;
-    public String date;
-    public String user;
-
-    public static LivyVersionResponse fromJson(String json) {
-      return gson.fromJson(json, LivyVersionResponse.class);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java
new file mode 100644
index 0000000..b52ba16
--- /dev/null
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.livy;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLContexts;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.zeppelin.interpreter.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.security.kerberos.client.KerberosRestTemplate;
+import org.springframework.web.client.HttpClientErrorException;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import javax.net.ssl.SSLContext;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+
+/**
+ * Base class for livy interpreters.
+ */
+public abstract class BaseLivyInterpreter extends Interpreter {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterpreter.class);
+  private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();
+  private static String SESSION_NOT_FOUND_PATTERN = "\"Session '\\d+' not found.\"";
+
+  protected volatile SessionInfo sessionInfo;
+  private String livyURL;
+  private int sessionCreationTimeout;
+  private int pullStatusInterval;
+  protected boolean displayAppInfo;
+  protected LivyVersion livyVersion;
+  private RestTemplate restTemplate;
+
+  Set<Object> paragraphsToCancel = Collections.newSetFromMap(
+      new ConcurrentHashMap<Object, Boolean>());
+  private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
+      new ConcurrentHashMap<>();
+
+  public BaseLivyInterpreter(Properties property) {
+    super(property);
+    this.livyURL = property.getProperty("zeppelin.livy.url");
+    this.displayAppInfo = Boolean.parseBoolean(
+        property.getProperty("zeppelin.livy.displayAppInfo", "false"));
+    this.sessionCreationTimeout = Integer.parseInt(
+        property.getProperty("zeppelin.livy.session.create_timeout", 120 + ""));
+    this.pullStatusInterval = Integer.parseInt(
+        property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + ""));
+    this.restTemplate = createRestTemplate();
+  }
+
+  public abstract String getSessionKind();
+
+  @Override
+  public void open() {
+    try {
+      initLivySession();
+    } catch (LivyException e) {
+      String msg = "Fail to create session, please check livy interpreter log and " +
+          "livy server log";
+      throw new RuntimeException(msg, e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (sessionInfo != null) {
+      closeSession(sessionInfo.id);
+      // reset sessionInfo to null so that we won't close it twice.
+      sessionInfo = null;
+    }
+  }
+
+  protected void initLivySession() throws LivyException {
+    this.sessionInfo = createSession(getUserName(), getSessionKind());
+    if (displayAppInfo) {
+      if (sessionInfo.appId == null) {
+        // livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
+        // explicitly by ourselves.
+        sessionInfo.appId = extractAppId();
+      }
+
+      if (sessionInfo.appInfo == null ||
+          StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
+        sessionInfo.webUIAddress = extractWebUIAddress();
+      } else {
+        sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
+      }
+      LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}",
+          sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress);
+    } else {
+      LOGGER.info("Create livy session successfully with sessionId: {}", this.sessionInfo.id);
+    }
+    // check livy version
+    try {
+      this.livyVersion = getLivyVersion();
+      LOGGER.info("Use livy " + livyVersion);
+    } catch (APINotFoundException e) {
+      this.livyVersion = new LivyVersion("0.2.0");
+      LOGGER.info("Use livy 0.2.0");
+    }
+  }
+
+  protected abstract String extractAppId() throws LivyException;
+
+  protected abstract String extractWebUIAddress() throws LivyException;
+
+  public SessionInfo getSessionInfo() {
+    return sessionInfo;
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    if (StringUtils.isEmpty(st)) {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
+    }
+
+    try {
+      return interpret(st, context.getParagraphId(), this.displayAppInfo, true);
+    } catch (LivyException e) {
+      LOGGER.error("Fail to interpret:" + st, e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR,
+          InterpreterUtils.getMostRelevantMessage(e));
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    paragraphsToCancel.add(context.getParagraphId());
+    LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation.");
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    if (livyVersion.isGetProgressSupported()) {
+      String paraId = context.getParagraphId();
+      Integer progress = paragraphId2StmtProgressMap.get(paraId);
+      return progress == null ? 0 : progress;
+    }
+    return 0;
+  }
+
+  private SessionInfo createSession(String user, String kind)
+      throws LivyException {
+    try {
+      Map<String, String> conf = new HashMap<>();
+      for (Map.Entry<Object, Object> entry : property.entrySet()) {
+        if (entry.getKey().toString().startsWith("livy.spark.") &&
+            !entry.getValue().toString().isEmpty())
+          conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
+      }
+
+      CreateSessionRequest request = new CreateSessionRequest(kind,
+          user == null || user.equals("anonymous") ? null : user, conf);
+      SessionInfo sessionInfo = SessionInfo.fromJson(
+          callRestAPI("/sessions", "POST", request.toJson()));
+      long start = System.currentTimeMillis();
+      // pull the session status until it is idle or timeout
+      while (!sessionInfo.isReady()) {
+        if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
+          String msg = "The creation of session " + sessionInfo.id + " is timeout within "
+              + sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
+              + ", log: " + sessionInfo.log;
+          throw new LivyException(msg);
+        }
+        Thread.sleep(pullStatusInterval);
+        sessionInfo = getSessionInfo(sessionInfo.id);
+        LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
+            sessionInfo.appId);
+        if (sessionInfo.isFinished()) {
+          String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
+              + ", log: " + sessionInfo.log;
+          throw new LivyException(msg);
+        }
+      }
+      return sessionInfo;
+    } catch (Exception e) {
+      LOGGER.error("Error when creating livy session for user " + user, e);
+      throw new LivyException(e);
+    }
+  }
+
+  private SessionInfo getSessionInfo(int sessionId) throws LivyException {
+    return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
+  }
+
+  public InterpreterResult interpret(String code,
+                                     String paragraphId,
+                                     boolean displayAppInfo,
+                                     boolean appendSessionExpired) throws LivyException {
+    StatementInfo stmtInfo = null;
+    boolean sessionExpired = false;
+    try {
+      try {
+        stmtInfo = executeStatement(new ExecuteRequest(code));
+      } catch (SessionNotFoundException e) {
+        LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
+        sessionExpired = true;
+        // we don't want to create multiple sessions because it is possible to have multiple thread
+        // to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
+        // to check session status again in this sync block
+        synchronized (this) {
+          if (isSessionExpired()) {
+            initLivySession();
+          }
+        }
+        stmtInfo = executeStatement(new ExecuteRequest(code));
+      }
+      // pull the statement status
+      while (!stmtInfo.isAvailable()) {
+        if (paragraphId != null && paragraphsToCancel.contains(paragraphId)) {
+          cancel(stmtInfo.id, paragraphId);
+          return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
+        }
+        try {
+          Thread.sleep(pullStatusInterval);
+        } catch (InterruptedException e) {
+          LOGGER.error("InterruptedException when pulling statement status.", e);
+          throw new LivyException(e);
+        }
+        stmtInfo = getStatementInfo(stmtInfo.id);
+        if (paragraphId != null) {
+          paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
+        }
+      }
+      if (appendSessionExpired) {
+        return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
+            sessionExpired);
+      } else {
+        return getResultFromStatementInfo(stmtInfo, displayAppInfo);
+      }
+    } finally {
+      if (paragraphId != null) {
+        paragraphId2StmtProgressMap.remove(paragraphId);
+        paragraphsToCancel.remove(paragraphId);
+      }
+    }
+  }
+
+  private void cancel(int id, String paragraphId) {
+    if (livyVersion.isCancelSupported()) {
+      try {
+        LOGGER.info("Cancelling statement " + id);
+        cancelStatement(id);
+      } catch (LivyException e) {
+        LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
+      }
+      finally {
+        paragraphsToCancel.remove(paragraphId);
+      }
+    } else {
+      LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
+      paragraphsToCancel.clear();
+    }
+  }
+
+  protected LivyVersion getLivyVersion() throws LivyException {
+    return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version));
+  }
+
+  private boolean isSessionExpired() throws LivyException {
+    try {
+      getSessionInfo(sessionInfo.id);
+      return false;
+    } catch (SessionNotFoundException e) {
+      return true;
+    } catch (LivyException e) {
+      throw e;
+    }
+  }
+
+  private InterpreterResult appendSessionExpire(InterpreterResult result, boolean sessionExpired) {
+    if (sessionExpired) {
+      InterpreterResult result2 = new InterpreterResult(result.code());
+      result2.add(InterpreterResult.Type.HTML,
+          "<font color=\"red\">Previous livy session is expired, new livy session is created. " +
+              "Paragraphs that depend on this paragraph need to be re-executed!" + "</font>");
+      for (InterpreterResultMessage message : result.message()) {
+        result2.add(message.getType(), message.getData());
+      }
+      return result2;
+    } else {
+      return result;
+    }
+  }
+
+
+  private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
+                                                       boolean displayAppInfo) {
+    if (stmtInfo.output != null && stmtInfo.output.isError()) {
+      return new InterpreterResult(InterpreterResult.Code.ERROR, stmtInfo.output.evalue);
+    } else if (stmtInfo.isCancelled()) {
+      // corner case, output might be null if it is cancelled.
+      return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
+    } else if (stmtInfo.output == null) {
+      // This case should never happen, just in case
+      return new InterpreterResult(InterpreterResult.Code.ERROR, "Empty output");
+    } else {
+      //TODO(zjffdu) support other types of data (like json, image and etc)
+      String result = stmtInfo.output.data.plain_text;
+
+      // check table magic result first
+      if (stmtInfo.output.data.application_livy_table_json != null) {
+        StringBuilder outputBuilder = new StringBuilder();
+        boolean notFirstColumn = false;
+
+        for (Map header : stmtInfo.output.data.application_livy_table_json.headers) {
+          if (notFirstColumn) {
+            outputBuilder.append("\t");
+          }
+          outputBuilder.append(header.get("name"));
+          notFirstColumn = true;
+        }
+
+        outputBuilder.append("\n");
+        for (List<Object> row : stmtInfo.output.data.application_livy_table_json.records) {
+          outputBuilder.append(StringUtils.join(row, "\t"));
+          outputBuilder.append("\n");
+        }
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS,
+            InterpreterResult.Type.TABLE, outputBuilder.toString());
+      } else if (stmtInfo.output.data.image_png != null) {
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS,
+            InterpreterResult.Type.IMG, (String) stmtInfo.output.data.image_png);
+      } else if (result != null) {
+        result = result.trim();
+        if (result.startsWith("<link")
+            || result.startsWith("<script")
+            || result.startsWith("<style")
+            || result.startsWith("<div")) {
+          result = "%html " + result;
+        }
+      }
+
+      if (displayAppInfo) {
+        InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
+        interpreterResult.add(result);
+        String appInfoHtml = "<hr/>Spark Application Id: " + sessionInfo.appId + "<br/>"
+            + "Spark WebUI: <a href=\"" + sessionInfo.webUIAddress + "\">"
+            + sessionInfo.webUIAddress + "</a>";
+        interpreterResult.add(InterpreterResult.Type.HTML, appInfoHtml);
+        return interpreterResult;
+      } else {
+        return new InterpreterResult(InterpreterResult.Code.SUCCESS, result);
+      }
+    }
+  }
+
+  private StatementInfo executeStatement(ExecuteRequest executeRequest)
+      throws LivyException {
+    return StatementInfo.fromJson(callRestAPI("/sessions/" + sessionInfo.id + "/statements", "POST",
+        executeRequest.toJson()));
+  }
+
+  private StatementInfo getStatementInfo(int statementId)
+      throws LivyException {
+    return StatementInfo.fromJson(
+        callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET"));
+  }
+
+  private void cancelStatement(int statementId) throws LivyException {
+    callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId + "/cancel", "POST");
+  }
+
+
+  private RestTemplate createRestTemplate() {
+    HttpClient httpClient = null;
+    if (livyURL.startsWith("https:")) {
+      String keystoreFile = property.getProperty("zeppelin.livy.ssl.trustStore");
+      String password = property.getProperty("zeppelin.livy.ssl.trustStorePassword");
+      if (StringUtils.isBlank(keystoreFile)) {
+        throw new RuntimeException("No zeppelin.livy.ssl.trustStore specified for livy ssl");
+      }
+      if (StringUtils.isBlank(password)) {
+        throw new RuntimeException("No zeppelin.livy.ssl.trustStorePassword specified " +
+            "for livy ssl");
+      }
+      FileInputStream inputStream = null;
+      try {
+        inputStream = new FileInputStream(keystoreFile);
+        KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+        trustStore.load(new FileInputStream(keystoreFile), password.toCharArray());
+        SSLContext sslContext = SSLContexts.custom()
+            .loadTrustMaterial(trustStore)
+            .build();
+        SSLConnectionSocketFactory csf = new SSLConnectionSocketFactory(sslContext);
+        httpClient = HttpClients.custom().setSSLSocketFactory(csf).build();
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to create SSL HttpClient", e);
+      } finally {
+        if (inputStream != null) {
+          try {
+            inputStream.close();
+          } catch (IOException e) {
+            LOGGER.error("Failed to close keystore file", e);
+          }
+        }
+      }
+    }
+
+    String keytabLocation = property.getProperty("zeppelin.livy.keytab");
+    String principal = property.getProperty("zeppelin.livy.principal");
+    if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
+      if (httpClient == null) {
+        return new KerberosRestTemplate(keytabLocation, principal);
+      } else {
+        return new KerberosRestTemplate(keytabLocation, principal, httpClient);
+      }
+    }
+    if (httpClient == null) {
+      return new RestTemplate();
+    } else {
+      return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient));
+    }
+  }
+
+  private String callRestAPI(String targetURL, String method) throws LivyException {
+    return callRestAPI(targetURL, method, "");
+  }
+
+  private String callRestAPI(String targetURL, String method, String jsonData)
+      throws LivyException {
+    targetURL = livyURL + targetURL;
+    LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
+    HttpHeaders headers = new HttpHeaders();
+    headers.add("Content-Type", "application/json");
+    headers.add("X-Requested-By", "zeppelin");
+    ResponseEntity<String> response = null;
+    try {
+      if (method.equals("POST")) {
+        HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
+        response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
+      } else if (method.equals("GET")) {
+        HttpEntity<String> entity = new HttpEntity<>(headers);
+        response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
+      } else if (method.equals("DELETE")) {
+        HttpEntity<String> entity = new HttpEntity<>(headers);
+        response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
+      }
+    } catch (HttpClientErrorException e) {
+      response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
+      LOGGER.error(String.format("Error with %s StatusCode: %s",
+          response.getStatusCode().value(), e.getResponseBodyAsString()));
+    } catch (RestClientException e) {
+      // Exception happens when kerberos is enabled.
+      if (e.getCause() instanceof HttpClientErrorException) {
+        HttpClientErrorException cause = (HttpClientErrorException) e.getCause();
+        if (cause.getResponseBodyAsString().matches(SESSION_NOT_FOUND_PATTERN)) {
+          throw new SessionNotFoundException(cause.getResponseBodyAsString());
+        }
+        throw new LivyException(cause.getResponseBodyAsString() + "\n"
+            + ExceptionUtils.getFullStackTrace(ExceptionUtils.getRootCause(e)));
+      }
+      throw new LivyException(e);
+    }
+    if (response == null) {
+      throw new LivyException("No http response returned");
+    }
+    LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(),
+        response.getBody());
+    if (response.getStatusCode().value() == 200
+        || response.getStatusCode().value() == 201) {
+      return response.getBody();
+    } else if (response.getStatusCode().value() == 404) {
+      if (response.getBody().matches(SESSION_NOT_FOUND_PATTERN)) {
+        throw new SessionNotFoundException(response.getBody());
+      } else {
+        throw new APINotFoundException("No rest api found for " + targetURL +
+            ", " + response.getStatusCode());
+      }
+    } else {
+      String responseString = response.getBody();
+      if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
+        return responseString;
+      }
+      throw new LivyException(String.format("Error with %s StatusCode: %s",
+          response.getStatusCode().value(), responseString));
+    }
+  }
+
+  private void closeSession(int sessionId) {
+    try {
+      callRestAPI("/sessions/" + sessionId, "DELETE");
+    } catch (Exception e) {
+      LOGGER.error(String.format("Error closing session for user with session ID: %s",
+          sessionId), e);
+    }
+  }
+
+  /*
+  * We create these POJO here to accommodate livy 0.3 which is not released yet. livy rest api has
+  * some changes from version to version. So we create these POJO in zeppelin side to accommodate
+  * incompatibility between versions. Later, when livy become more stable, we could just depend on
+  * livy client jar.
+  */
+  private static class CreateSessionRequest {
+    public final String kind;
+    @SerializedName("proxyUser")
+    public final String user;
+    public final Map<String, String> conf;
+
+    public CreateSessionRequest(String kind, String user, Map<String, String> conf) {
+      this.kind = kind;
+      this.user = user;
+      this.conf = conf;
+    }
+
+    public String toJson() {
+      return gson.toJson(this);
+    }
+  }
+
+  /**
+   *
+   */
+  public static class SessionInfo {
+
+    public final int id;
+    public String appId;
+    public String webUIAddress;
+    public final String owner;
+    public final String proxyUser;
+    public final String state;
+    public final String kind;
+    public final Map<String, String> appInfo;
+    public final List<String> log;
+
+    public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
+                       String kind, Map<String, String> appInfo, List<String> log) {
+      this.id = id;
+      this.appId = appId;
+      this.owner = owner;
+      this.proxyUser = proxyUser;
+      this.state = state;
+      this.kind = kind;
+      this.appInfo = appInfo;
+      this.log = log;
+    }
+
+    public boolean isReady() {
+      return state.equals("idle");
+    }
+
+    public boolean isFinished() {
+      return state.equals("error") || state.equals("dead") || state.equals("success");
+    }
+
+    public static SessionInfo fromJson(String json) {
+      return gson.fromJson(json, SessionInfo.class);
+    }
+  }
+
+  private static class ExecuteRequest {
+    public final String code;
+
+    public ExecuteRequest(String code) {
+      this.code = code;
+    }
+
+    public String toJson() {
+      return gson.toJson(this);
+    }
+  }
+
+  private static class StatementInfo {
+    public Integer id;
+    public String state;
+    public double progress;
+    public StatementOutput output;
+
+    public StatementInfo() {
+    }
+
+    public static StatementInfo fromJson(String json) {
+      return gson.fromJson(json, StatementInfo.class);
+    }
+
+    public boolean isAvailable() {
+      return state.equals("available") || state.equals("cancelled");
+    }
+
+    public boolean isCancelled() {
+      return state.equals("cancelled");
+    }
+
+    private static class StatementOutput {
+      public String status;
+      public String execution_count;
+      public Data data;
+      public String ename;
+      public String evalue;
+      public Object traceback;
+      public TableMagic tableMagic;
+
+      public boolean isError() {
+        return status.equals("error");
+      }
+
+      public String toJson() {
+        return gson.toJson(this);
+      }
+
+      private static class Data {
+        @SerializedName("text/plain")
+        public String plain_text;
+        @SerializedName("image/png")
+        public String image_png;
+        @SerializedName("application/json")
+        public String application_json;
+        @SerializedName("application/vnd.livy.table.v1+json")
+        public TableMagic application_livy_table_json;
+      }
+
+      private static class TableMagic {
+        @SerializedName("headers")
+        List<Map> headers;
+
+        @SerializedName("data")
+        List<List> records;
+      }
+    }
+  }
+
+  private static class LivyVersionResponse {
+    public String url;
+    public String branch;
+    public String revision;
+    public String version;
+    public String date;
+    public String user;
+
+    public static LivyVersionResponse fromJson(String json) {
+      return gson.fromJson(json, LivyVersionResponse.class);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java
index f1f3538..174c2c0 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySpark3Interpreter.java
@@ -17,16 +17,6 @@
 
 package org.apache.zeppelin.livy;
 
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java
index c0de234..17b20e3 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkBaseInterpreter.java
@@ -23,7 +23,7 @@ import java.util.Properties;
 /**
  * Base class for PySpark Interpreter
  */
-public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterprereter {
+public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterpreter {
 
   public LivyPySparkBaseInterpreter(Properties property) {
     super(property);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java
index b5bf106..d664bbe 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyPySparkInterpreter.java
@@ -17,16 +17,6 @@
 
 package org.apache.zeppelin.livy;
 
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
index f3a5eab..606ef64 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
@@ -17,23 +17,12 @@
 
 package org.apache.zeppelin.livy;
 
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Livy Spark interpreter for Zeppelin.
  */
-public class LivySparkInterpreter extends BaseLivyInterprereter {
+public class LivySparkInterpreter extends BaseLivyInterpreter {
 
   public LivySparkInterpreter(Properties property) {
     super(property);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java
index 9bd24b7..c270437 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkRInterpreter.java
@@ -17,23 +17,13 @@
 
 package org.apache.zeppelin.livy;
 
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 
 /**
  * Livy PySpark interpreter for Zeppelin.
  */
-public class LivySparkRInterpreter extends BaseLivyInterprereter {
+public class LivySparkRInterpreter extends BaseLivyInterpreter {
 
   public LivySparkRInterpreter(Properties property) {
     super(property);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
index 9c0d359..d132b5b 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java
@@ -30,7 +30,7 @@ import java.util.Properties;
 /**
  * Livy SparkSQL Interpreter for Zeppelin.
  */
-public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
+public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
 
   public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE =
       "zeppelin.livy.spark.sql.field.truncate";

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55cb6b89/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index 007c0ed..6db75e9 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -761,7 +761,7 @@ public class LivyInterpreterIT {
     }
   }
 
-  private boolean isSpark2(BaseLivyInterprereter interpreter, InterpreterContext context) {
+  private boolean isSpark2(BaseLivyInterpreter interpreter, InterpreterContext context) {
     InterpreterResult result = null;
     if (interpreter instanceof LivySparkRInterpreter) {
       result = interpreter.interpret("sparkR.session()", context);