You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/08/31 13:08:02 UTC

[1/2] incubator-livy git commit: [LIVY-194][REPL] Add shared language support for Livy interactive session

Repository: incubator-livy
Updated Branches:
  refs/heads/master 317290d4b -> c1aafeb6c


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java b/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java
new file mode 100644
index 0000000..c28bac9
--- /dev/null
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.rsc.driver;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.hive.HiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkEntries {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkEntries.class);
+
+  private volatile JavaSparkContext sc;
+  private final SparkConf conf;
+  private volatile SQLContext sqlctx;
+  private volatile HiveContext hivectx;
+  private volatile Object sparksession;
+
+  public SparkEntries(SparkConf conf) {
+    this.conf = conf;
+  }
+
+  public JavaSparkContext sc() {
+    if (sc == null) {
+      synchronized (this) {
+        if (sc == null) {
+          long t1 = System.nanoTime();
+          LOG.info("Starting Spark context...");
+          SparkContext scalaSc = SparkContext.getOrCreate(conf);
+          sc = new JavaSparkContext(scalaSc);
+          LOG.info("Spark context finished initialization in {}ms",
+            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1));
+        }
+      }
+    }
+    return sc;
+  }
+
+  @SuppressWarnings("unchecked")
+  public Object sparkSession() throws Exception {
+    if (sparksession == null) {
+      synchronized (this) {
+        if (sparksession == null) {
+          try {
+            Class<?> clz = Class.forName("org.apache.spark.sql.SparkSession$");
+            Object spark = clz.getField("MODULE$").get(null);
+            Method m = clz.getMethod("builder");
+            Object builder = m.invoke(spark);
+            builder.getClass().getMethod("sparkContext", SparkContext.class)
+              .invoke(builder, sc().sc());
+
+            SparkConf conf = sc().getConf();
+            if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase()
+              .equals("hive")) {
+              if ((boolean) clz.getMethod("hiveClassesArePresent").invoke(spark)) {
+                ClassLoader loader = Thread.currentThread().getContextClassLoader() != null ?
+                  Thread.currentThread().getContextClassLoader() : getClass().getClassLoader();
+                if (loader.getResource("hive-site.xml") == null) {
+                  LOG.warn("livy.repl.enable-hive-context is true but no hive-site.xml found on " +
+                   "classpath");
+                }
+
+                builder.getClass().getMethod("enableHiveSupport").invoke(builder);
+                sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder);
+                LOG.info("Created Spark session (with Hive support).");
+              } else {
+                builder.getClass().getMethod("config", String.class, String.class)
+                  .invoke(builder, "spark.sql.catalogImplementation", "in-memory");
+                sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder);
+                LOG.info("Created Spark session.");
+              }
+            } else {
+              sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder);
+              LOG.info("Created Spark session.");
+            }
+          } catch (Exception e) {
+            LOG.warn("SparkSession is not supported", e);
+            throw e;
+          }
+        }
+      }
+    }
+
+    return sparksession;
+  }
+
+  public SQLContext sqlctx() {
+    if (sqlctx == null) {
+      synchronized (this) {
+        if (sqlctx == null) {
+          sqlctx = new SQLContext(sc());
+          LOG.info("Created SQLContext.");
+        }
+      }
+    }
+    return sqlctx;
+  }
+
+  public HiveContext hivectx() {
+    if (hivectx == null) {
+      synchronized (this) {
+        if (hivectx == null) {
+          SparkConf conf = sc.getConf();
+          if (conf.getBoolean("spark.repl.enableHiveContext", false) ||
+            conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase()
+              .equals("hive")) {
+            ClassLoader loader = Thread.currentThread().getContextClassLoader() != null ?
+              Thread.currentThread().getContextClassLoader() : getClass().getClassLoader();
+            if (loader.getResource("hive-site.xml") == null) {
+              LOG.warn("livy.repl.enable-hive-context is true but no hive-site.xml found on " +
+               "classpath.");
+            }
+            hivectx = new HiveContext(sc().sc());
+            LOG.info("Created HiveContext.");
+          }
+        }
+      }
+    }
+    return hivectx;
+  }
+
+  public synchronized void stop() {
+    if (sc != null) {
+      sc.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
index 0663822..cacf943 100644
--- a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
+++ b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
@@ -28,6 +28,7 @@ import java.util.concurrent.*;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.spark.launcher.SparkLauncher;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.Test;
@@ -76,6 +77,8 @@ public class TestSparkClient {
     }
 
     conf.put(LIVY_JARS.key(), "");
+    conf.put("spark.repl.enableHiveContext", "true");
+    conf.put("spark.sql.catalogImplementation", "hive");
     return conf;
   }
 
@@ -406,7 +409,7 @@ public class TestSparkClient {
         Serializer s = new Serializer();
         RSCClient lclient = (RSCClient) client;
         ByteBuffer job = s.serialize(new Echo<>("hello"));
-        String jobId = lclient.bypass(job, sync);
+        String jobId = lclient.bypass(job, "spark", sync);
 
         // Try to fetch the result, trying several times until the timeout runs out, and
         // backing off as attempts fail.
@@ -485,6 +488,16 @@ public class TestSparkClient {
       if (client != null) {
         client.stop(true);
       }
+
+      File derbyLog = new File("derby.log");
+      if (derbyLog.exists()) {
+        derbyLog.delete();
+      }
+
+      File metaStore = new File("metastore_db");
+      if (metaStore.exists()) {
+        FileUtils.deleteDirectory(metaStore);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
index bbb7abd..792c59f 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
@@ -17,10 +17,10 @@
 
 package org.apache.livy.server.interactive
 
-import org.apache.livy.sessions.{Kind, Spark}
+import org.apache.livy.sessions.{Kind, Shared}
 
 class CreateInteractiveRequest {
-  var kind: Kind = Spark()
+  var kind: Kind = Shared()
   var proxyUser: Option[String] = None
   var jars: List[String] = List()
   var pyFiles: List[String] = List()
@@ -37,8 +37,7 @@ class CreateInteractiveRequest {
   var heartbeatTimeoutInSecond: Int = 0
 
   override def toString: String = {
-    s"[kind: $kind, " +
-      s"proxyUser: $proxyUser, " +
+    s"[kind: $kind, proxyUser: $proxyUser, " +
       (if (jars.nonEmpty) s"jars: ${jars.mkString(",")}, " else "") +
       (if (pyFiles.nonEmpty) s"pyFiles: ${pyFiles.mkString(",")}, " else "") +
       (if (files.nonEmpty) s"files: ${files.mkString(",")}, " else "") +

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 207cc04..0462e80 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -296,18 +296,19 @@ object InteractiveSession extends Logging {
       }
     }
 
-    kind match {
-      case PySpark() | PySpark3() =>
-        val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil
-        mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)
-        builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
-      case SparkR() =>
-        val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
-        sparkRArchive.foreach { archive =>
-          builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr")
-        }
-      case _ =>
+    val pySparkFiles = if (!LivyConf.TEST_MODE) {
+      builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
+      findPySparkArchives()
+    } else {
+      Nil
     }
+    mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)
+
+    val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
+    sparkRArchive.foreach { archive =>
+      builderProperties.put(RSCConf.Entry.SPARKR_PACKAGE.key(), archive + "#sparkr")
+    }
+
     builderProperties.put(RSCConf.Entry.SESSION_KIND.key, kind.toString)
 
     // Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set.
@@ -490,7 +491,7 @@ class InteractiveSession(
     ensureRunning()
     recordActivity()
 
-    val id = client.get.submitReplCode(content.code).get
+    val id = client.get.submitReplCode(content.code, content.kind.orNull).get
     client.get.getReplJobResults(id, 1).get().statements(0)
   }
 
@@ -500,12 +501,12 @@ class InteractiveSession(
     client.get.cancelReplCode(statementId)
   }
 
-  def runJob(job: Array[Byte]): Long = {
-    performOperation(job, true)
+  def runJob(job: Array[Byte], jobType: String): Long = {
+    performOperation(job, jobType, true)
   }
 
-  def submitJob(job: Array[Byte]): Long = {
-    performOperation(job, false)
+  def submitJob(job: Array[Byte], jobType: String): Long = {
+    performOperation(job, jobType, false)
   }
 
   def addFile(fileStream: InputStream, fileName: String): Unit = {
@@ -569,10 +570,10 @@ class InteractiveSession(
     }
   }
 
-  private def performOperation(job: Array[Byte], sync: Boolean): Long = {
+  private def performOperation(job: Array[Byte], jobType: String, sync: Boolean): Long = {
     ensureActive()
     recordActivity()
-    val future = client.get.bypass(ByteBuffer.wrap(job), sync)
+    val future = client.get.bypass(ByteBuffer.wrap(job), jobType, sync)
     val opId = operationCounter.incrementAndGet()
     operations(opId) = future
     opId

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 900c826..3800856 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -153,7 +153,7 @@ class InteractiveSessionServlet(
     withModifyAccessSession { session =>
       try {
       require(req.job != null && req.job.length > 0, "no job provided.")
-      val jobId = session.submitJob(req.job)
+      val jobId = session.submitJob(req.job, req.jobType)
       Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
       } catch {
         case e: Throwable =>
@@ -165,7 +165,7 @@ class InteractiveSessionServlet(
   jpost[SerializedJob]("/:id/run-job") { req =>
     withModifyAccessSession { session =>
       require(req.job != null && req.job.length > 0, "no job provided.")
-      val jobId = session.runJob(req.job)
+      val jobId = session.runJob(req.job, req.jobType)
       Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
     }
   }
@@ -211,10 +211,7 @@ class InteractiveSessionServlet(
 
   jpost[AddResource]("/:id/add-pyfile") { req =>
     withModifyAccessSession { lsession =>
-      lsession.kind match {
-        case PySpark() | PySpark3() => addJarOrPyFile(req, lsession)
-        case _ => BadRequest("Only supported for pyspark sessions.")
-      }
+      addJarOrPyFile(req, lsession)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/server/src/test/resources/log4j.properties b/server/src/test/resources/log4j.properties
index 9c8586e..9195bd9 100644
--- a/server/src/test/resources/log4j.properties
+++ b/server/src/test/resources/log4j.properties
@@ -17,7 +17,7 @@
 
 # Set everything to be logged to the file target/unit-tests.log
 test.appender=file
-log4j.rootCategory=DEBUG, ${test.appender}
+log4j.rootCategory=INFO, ${test.appender}
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.append=true
 log4j.appender.file.file=target/unit-tests.log

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index 1d72062..e1de22e 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -118,7 +118,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
       batch should be (defined)
     }
 
-    jpost[Map[String, Any]]("/0/statements", ExecuteRequest("foo")) { data =>
+    jpost[Map[String, Any]]("/0/statements", ExecuteRequest("foo", Some("spark"))) { data =>
       data("id") should be (0)
       data("code") shouldBe "1+1"
       data("progress") should be (0.0)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index d2ae9ae..9943c00 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -71,8 +71,8 @@ class InteractiveSessionSpec extends FunSpec
     InteractiveSession.create(0, null, None, livyConf, req, sessionStore, mockApp)
   }
 
-  private def executeStatement(code: String): JValue = {
-    val id = session.executeStatement(ExecuteRequest(code)).id
+  private def executeStatement(code: String, codeType: Option[String] = None): JValue = {
+    val id = session.executeStatement(ExecuteRequest(code, codeType)).id
     eventually(timeout(30 seconds), interval(100 millis)) {
       val s = session.getStatement(id).get
       s.state.get() shouldBe StatementState.Available
@@ -154,15 +154,10 @@ class InteractiveSessionSpec extends FunSpec
       properties1(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars1
     }
 
-    it("should start in the idle state") {
-      session = createSession()
-      session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle]))
-    }
-
     it("should update appId and appInfo and session store") {
       val mockApp = mock[SparkApp]
       val sessionStore = mock[SessionStore]
-      val session = createSession(sessionStore, Some(mockApp))
+      session = createSession(sessionStore, Some(mockApp))
 
       val expectedAppId = "APPID"
       session.appIdKnown(expectedAppId)
@@ -174,26 +169,38 @@ class InteractiveSessionSpec extends FunSpec
 
       verify(sessionStore, atLeastOnce()).save(
         MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject())
+
+      session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle]))
     }
 
     withSession("should execute `1 + 2` == 3") { session =>
-      val result = executeStatement("1 + 2")
-      val expectedResult = Extraction.decompose(Map(
+      val pyResult = executeStatement("1 + 2", Some("pyspark"))
+      pyResult should equal (Extraction.decompose(Map(
         "status" -> "ok",
         "execution_count" -> 0,
-        "data" -> Map(
-          "text/plain" -> "3"
-        )
-      ))
+        "data" -> Map("text/plain" -> "3")))
+      )
 
-      result should equal (expectedResult)
+      val scalaResult = executeStatement("1 + 2", Some("spark"))
+      scalaResult should equal (Extraction.decompose(Map(
+        "status" -> "ok",
+        "execution_count" -> 1,
+        "data" -> Map("text/plain" -> "res0: Int = 3")))
+      )
+
+      val rResult = executeStatement("1 + 2", Some("sparkr"))
+      rResult should equal (Extraction.decompose(Map(
+        "status" -> "ok",
+        "execution_count" -> 2,
+        "data" -> Map("text/plain" -> "[1] 3")))
+      )
     }
 
     withSession("should report an error if accessing an unknown variable") { session =>
       val result = executeStatement("x")
       val expectedResult = Extraction.decompose(Map(
         "status" -> "error",
-        "execution_count" -> 1,
+        "execution_count" -> 3,
         "ename" -> "NameError",
         "evalue" -> "name 'x' is not defined",
         "traceback" -> List(
@@ -214,7 +221,7 @@ class InteractiveSessionSpec extends FunSpec
           |from time import sleep
           |sleep(3)
         """.stripMargin
-      val statement = session.executeStatement(ExecuteRequest(code))
+      val statement = session.executeStatement(ExecuteRequest(code, None))
       statement.progress should be (0.0)
 
       eventually(timeout(10 seconds), interval(100 millis)) {
@@ -225,7 +232,7 @@ class InteractiveSessionSpec extends FunSpec
     }
 
     withSession("should error out the session if the interpreter dies") { session =>
-      session.executeStatement(ExecuteRequest("import os; os._exit(666)"))
+      session.executeStatement(ExecuteRequest("import os; os._exit(666)", None))
       eventually(timeout(30 seconds), interval(100 millis)) {
         session.state shouldBe a[SessionState.Error]
       }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala
index 6c0ff40..d575b54 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala
@@ -75,7 +75,7 @@ class JobApiSpec extends BaseInteractiveServletSpec {
 
     withSessionId("should handle synchronous jobs") { testJobSubmission(_, true) }
 
-    // Test that the file does get copied over to the live home dir on HDFS - does not test end
+    // Test that the file does get copied over to the livy home dir on HDFS - does not test end
     // to end that the RSCClient class copies it over to the app.
     withSessionId("should support file uploads") { id =>
       testResourceUpload("file", id)
@@ -89,7 +89,7 @@ class JobApiSpec extends BaseInteractiveServletSpec {
       val ser = new Serializer()
       val job = BufferUtils.toByteArray(ser.serialize(new Echo("hello")))
       var jobId: Long = -1L
-      jpost[JobStatus](s"/$sid/submit-job", new SerializedJob(job)) { status =>
+      jpost[JobStatus](s"/$sid/submit-job", new SerializedJob(job, "spark")) { status =>
         jobId = status.id
       }
 
@@ -211,7 +211,7 @@ class JobApiSpec extends BaseInteractiveServletSpec {
     val jobData = BufferUtils.toByteArray(ser.serialize(job))
     val route = if (sync) s"/$sid/submit-job" else s"/$sid/run-job"
     var jobId: Long = -1L
-    jpost[JobStatus](route, new SerializedJob(jobData), headers = headers) { data =>
+    jpost[JobStatus](route, new SerializedJob(jobData, "spark"), headers = headers) { data =>
       jobId = data.id
     }
 


[2/2] incubator-livy git commit: [LIVY-194][REPL] Add shared language support for Livy interactive session

Posted by js...@apache.org.
[LIVY-194][REPL] Add shared language support for Livy interactive session

This is a ongoing work. Putting it here to leverage travis to test.

In the current Livy when we create a new session we need to specify session kind, which represents the interpreter kind bound with this session. User could choose either `spark` or `pyspark` or `sparkr`, Livy internally will create a related interpreter with current session. Also in Livy each session represents a complete Spark application / SparkContext.

This brings a limitation that user can only choose one language along with its own Spark application. Furthermore, data generated by `pyspark` interpreter cannot be shared with `spark` interpreter. So to improve the usability of Livy we propose to add multiple interpreters support in one session.

Author: jerryshao <ss...@hortonworks.com>

Closes #28 from jerryshao/LIVY-194.

Change-Id: I743871c80ccb5c16101236e052d5f31662382667


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/c1aafeb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/c1aafeb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/c1aafeb6

Branch: refs/heads/master
Commit: c1aafeb6cb87f2bd7f4cb7cf538822b59fb34a9c
Parents: 317290d
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Aug 31 21:07:42 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Aug 31 21:07:42 2017 +0800

----------------------------------------------------------------------
 .../apache/livy/client/common/HttpMessages.java |   6 +-
 .../apache/livy/client/http/JobHandleImpl.java  |   2 +-
 .../livy/client/http/HttpClientSpec.scala       |   2 +-
 core/src/main/scala/org/apache/livy/msgs.scala  |   2 +-
 .../scala/org/apache/livy/sessions/Kind.scala   |  10 +-
 integration-test/pom.xml                        |   2 +-
 python-api/src/main/python/livy/client.py       |   4 +-
 .../org/apache/livy/repl/SparkInterpreter.scala |  28 ++--
 .../org/apache/livy/repl/SparkInterpreter.scala |  26 ++--
 repl/src/main/resources/fake_shell.py           |  34 ++++-
 .../livy/repl/AbstractSparkInterpreter.scala    |  52 +++++++
 .../org/apache/livy/repl/BypassPySparkJob.scala |   7 +-
 .../org/apache/livy/repl/Interpreter.scala      |   5 +-
 .../apache/livy/repl/ProcessInterpreter.scala   |  10 +-
 .../apache/livy/repl/PythonInterpreter.scala    |  25 ++--
 .../scala/org/apache/livy/repl/ReplDriver.scala |  48 +++---
 .../scala/org/apache/livy/repl/Session.scala    | 120 ++++++++++-----
 .../livy/repl/SparkContextInitializer.scala     | 126 ----------------
 .../apache/livy/repl/SparkRInterpreter.scala    |  66 ++++----
 .../org/apache/livy/repl/BaseSessionSpec.scala  |  20 +--
 .../livy/repl/PythonInterpreterSpec.scala       |  24 ++-
 .../apache/livy/repl/PythonSessionSpec.scala    |  21 ++-
 .../org/apache/livy/repl/ReplDriverSuite.scala  |   2 +-
 .../org/apache/livy/repl/SessionSpec.scala      |  61 ++++----
 .../apache/livy/repl/SharedSessionSpec.scala    | 126 ++++++++++++++++
 .../livy/repl/SparkRInterpreterSpec.scala       |  10 +-
 .../apache/livy/repl/SparkRSessionSpec.scala    |   7 +-
 .../org/apache/livy/repl/SparkSessionSpec.scala |   7 +-
 .../java/org/apache/livy/rsc/BaseProtocol.java  |  12 +-
 .../org/apache/livy/rsc/ContextLauncher.java    |   8 +-
 .../java/org/apache/livy/rsc/RSCClient.java     |  13 +-
 .../apache/livy/rsc/driver/JobContextImpl.java  |  65 ++------
 .../org/apache/livy/rsc/driver/RSCDriver.java   |  17 +--
 .../apache/livy/rsc/driver/SparkEntries.java    | 149 +++++++++++++++++++
 .../org/apache/livy/rsc/TestSparkClient.java    |  15 +-
 .../interactive/CreateInteractiveRequest.scala  |   7 +-
 .../server/interactive/InteractiveSession.scala |  37 ++---
 .../interactive/InteractiveSessionServlet.scala |   9 +-
 server/src/test/resources/log4j.properties      |   2 +-
 .../InteractiveSessionServletSpec.scala         |   2 +-
 .../interactive/InteractiveSessionSpec.scala    |  43 +++---
 .../livy/server/interactive/JobApiSpec.scala    |   6 +-
 42 files changed, 738 insertions(+), 500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
----------------------------------------------------------------------
diff --git a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index 99ce900..b1e253f 100644
--- a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++ b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -82,13 +82,15 @@ public class HttpMessages {
   public static class SerializedJob implements ClientMessage {
 
     public final byte[] job;
+    public final String jobType;
 
-    public SerializedJob(byte[] job) {
+    public SerializedJob(byte[] job, String jobType) {
       this.job = job;
+      this.jobType = jobType;
     }
 
     private SerializedJob() {
-      this(null);
+      this(null, null);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java
----------------------------------------------------------------------
diff --git a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java
index f0ffb59..d39dfe9 100644
--- a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java
+++ b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java
@@ -138,7 +138,7 @@ class JobHandleImpl<T> extends AbstractJobHandle<T> {
       @Override
       public void run() {
         try {
-          ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob));
+          ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark");
           JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command);
 
           if (isCancelPending) {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
----------------------------------------------------------------------
diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index 4984eae..5ea6f8d 100644
--- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -217,7 +217,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
 
   private def runJob(sync: Boolean, genStatusFn: Long => Seq[JobStatus]): (Long, JFuture[Int]) = {
     val jobId = java.lang.Long.valueOf(ID_GENERATOR.incrementAndGet())
-    when(session.submitJob(any(classOf[Array[Byte]]))).thenReturn(jobId)
+    when(session.submitJob(any(classOf[Array[Byte]]), anyString())).thenReturn(jobId)
 
     val statuses = genStatusFn(jobId)
     val first = statuses.head

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/core/src/main/scala/org/apache/livy/msgs.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/msgs.scala b/core/src/main/scala/org/apache/livy/msgs.scala
index 0dd0a26..048aa7f 100644
--- a/core/src/main/scala/org/apache/livy/msgs.scala
+++ b/core/src/main/scala/org/apache/livy/msgs.scala
@@ -28,7 +28,7 @@ case class Msg[T <: Content](msg_type: MsgType, content: T)
 
 sealed trait Content
 
-case class ExecuteRequest(code: String) extends Content {
+case class ExecuteRequest(code: String, kind: Option[String]) extends Content {
   val msg_type = MsgType.execute_request
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/core/src/main/scala/org/apache/livy/sessions/Kind.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/livy/sessions/Kind.scala b/core/src/main/scala/org/apache/livy/sessions/Kind.scala
index bfb166f..5e7fa00 100644
--- a/core/src/main/scala/org/apache/livy/sessions/Kind.scala
+++ b/core/src/main/scala/org/apache/livy/sessions/Kind.scala
@@ -30,21 +30,21 @@ case class PySpark() extends Kind {
   override def toString: String = "pyspark"
 }
 
-case class PySpark3() extends Kind {
-  override def toString: String = "pyspark3"
-}
-
 case class SparkR() extends Kind {
   override def toString: String = "sparkr"
 }
 
+case class Shared() extends Kind {
+  override def toString: String = "shared"
+}
+
 object Kind {
 
   def apply(kind: String): Kind = kind match {
     case "spark" | "scala" => Spark()
     case "pyspark" | "python" => PySpark()
-    case "pyspark3" | "python3" => PySpark3()
     case "sparkr" | "r" => SparkR()
+    case "shared" => Shared()
     case other => throw new IllegalArgumentException(s"Invalid kind: $other")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/integration-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index e15c470..efb8b38 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -286,7 +286,7 @@
         <configuration>
           <environmentVariables>
             <LIVY_HOME>${execution.root}</LIVY_HOME>
-            <LIVY_TEST>false</LIVY_TEST>
+            <LIVY_TEST>true</LIVY_TEST>
             <LIVY_INTEGRATION_TEST>true</LIVY_INTEGRATION_TEST>
           </environmentVariables>
           <systemProperties>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/python-api/src/main/python/livy/client.py
----------------------------------------------------------------------
diff --git a/python-api/src/main/python/livy/client.py b/python-api/src/main/python/livy/client.py
index 4a5a985..99f88f8 100644
--- a/python-api/src/main/python/livy/client.py
+++ b/python-api/src/main/python/livy/client.py
@@ -70,6 +70,7 @@ class HttpClient(object):
         uri = urlparse(url)
         self._config = ConfigParser()
         self._load_config(load_defaults, conf_dict)
+        self._job_type = 'pyspark'
         match = re.match(r'(.*)/sessions/([0-9]+)', uri.path)
         if match:
             base = ParseResult(scheme=uri.scheme, netloc=uri.netloc,
@@ -395,7 +396,8 @@ class HttpClient(object):
     def _send_job(self, command, job):
         pickled_job = cloudpickle.dumps(job)
         base64_pickled_job = base64.b64encode(pickled_job).decode('utf-8')
-        base64_pickled_job_data = {'job': base64_pickled_job}
+        base64_pickled_job_data = \
+            {'job': base64_pickled_job, 'jobType': self._job_type}
         handle = JobHandle(self._conn, self._session_id,
             self._executor)
         handle._start(command, base64_pickled_job_data)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
index f5b5b32..39009b2 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -26,20 +26,20 @@ import scala.tools.nsc.interpreter.JPrintWriter
 import scala.tools.nsc.interpreter.Results.Result
 import scala.util.{Failure, Success, Try}
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkIMain
 
+import org.apache.livy.rsc.driver.SparkEntries
+
 /**
  * This represents a Spark interpreter. It is not thread safe.
  */
-class SparkInterpreter(conf: SparkConf)
-  extends AbstractSparkInterpreter with SparkContextInitializer {
+class SparkInterpreter(protected override val conf: SparkConf) extends AbstractSparkInterpreter {
 
   private var sparkIMain: SparkIMain = _
-  protected var sparkContext: SparkContext = _
 
-  override def start(): SparkContext = {
-    require(sparkIMain == null && sparkContext == null)
+  override def start(): Unit = {
+    require(sparkIMain == null)
 
     val settings = new Settings()
     settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
@@ -103,23 +103,21 @@ class SparkInterpreter(conf: SparkConf)
         }
       }
 
-      createSparkContext(conf)
+      postStart()
     }
-
-    sparkContext
   }
 
-  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+  override protected def bind(name: String,
+      tpe: String,
+      value: Object,
+      modifier: List[String]): Unit = {
     sparkIMain.beQuietDuring {
       sparkIMain.bind(name, tpe, value, modifier)
     }
   }
 
   override def close(): Unit = synchronized {
-    if (sparkContext != null) {
-      sparkContext.stop()
-      sparkContext = null
-    }
+    super.close()
 
     if (sparkIMain != null) {
       sparkIMain.close()
@@ -128,7 +126,7 @@ class SparkInterpreter(conf: SparkConf)
   }
 
   override protected def isStarted(): Boolean = {
-    sparkContext != null && sparkIMain != null
+    sparkIMain != null
   }
 
   override protected def interpret(code: String): Result = {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
index 94cd241..9d19ef3 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -26,20 +26,20 @@ import scala.tools.nsc.interpreter.JPrintWriter
 import scala.tools.nsc.interpreter.Results.Result
 import scala.util.control.NonFatal
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkILoop
 
+import org.apache.livy.rsc.driver.SparkEntries
+
 /**
  * Scala 2.11 version of SparkInterpreter
  */
-class SparkInterpreter(conf: SparkConf)
-  extends AbstractSparkInterpreter with SparkContextInitializer {
+class SparkInterpreter(protected override val conf: SparkConf) extends AbstractSparkInterpreter {
 
-  protected var sparkContext: SparkContext = _
   private var sparkILoop: SparkILoop = _
   private var sparkHttpServer: Object = _
 
-  override def start(): SparkContext = {
+  override def start(): Unit = {
     require(sparkILoop == null)
 
     val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
@@ -89,17 +89,12 @@ class SparkInterpreter(conf: SparkConf)
         }
       }
 
-      createSparkContext(conf)
+      postStart()
     }
-
-    sparkContext
   }
 
   override def close(): Unit = synchronized {
-    if (sparkContext != null) {
-      sparkContext.stop()
-      sparkContext = null
-    }
+    super.close()
 
     if (sparkILoop != null) {
       sparkILoop.closeInterpreter()
@@ -115,7 +110,7 @@ class SparkInterpreter(conf: SparkConf)
   }
 
   override protected def isStarted(): Boolean = {
-    sparkContext != null && sparkILoop != null
+    sparkILoop != null
   }
 
   override protected def interpret(code: String): Result = {
@@ -127,7 +122,10 @@ class SparkInterpreter(conf: SparkConf)
     Option(sparkILoop.lastRequest.lineRep.call("$result"))
   }
 
-  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+  override protected def bind(name: String,
+      tpe: String,
+      value: Object,
+      modifier: List[String]): Unit = {
     sparkILoop.beQuietDuring {
       sparkILoop.bind(name, tpe, value, modifier)
     }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/resources/fake_shell.py
----------------------------------------------------------------------
diff --git a/repl/src/main/resources/fake_shell.py b/repl/src/main/resources/fake_shell.py
index 534e0df..523f7a0 100644
--- a/repl/src/main/resources/fake_shell.py
+++ b/repl/src/main/resources/fake_shell.py
@@ -531,14 +531,42 @@ def main():
         listening_port = 0
         if os.environ.get("LIVY_TEST") != "true":
             #Load spark into the context
-            exec('from pyspark.shell import sc', global_dict)
-            exec('from pyspark.shell import sqlContext', global_dict)
             exec('from pyspark.sql import HiveContext', global_dict)
             exec('from pyspark.streaming import StreamingContext', global_dict)
             exec('import pyspark.cloudpickle as cloudpickle', global_dict)
 
+            from py4j.java_gateway import java_import, JavaGateway, GatewayClient
+            from pyspark.conf import SparkConf
+            from pyspark.context import SparkContext
+            from pyspark.sql import SQLContext, HiveContext, Row
+            # Connect to the gateway
+            gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
+            gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True)
+
+            # Import the classes used by PySpark
+            java_import(gateway.jvm, "org.apache.spark.SparkConf")
+            java_import(gateway.jvm, "org.apache.spark.api.java.*")
+            java_import(gateway.jvm, "org.apache.spark.api.python.*")
+            java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
+            java_import(gateway.jvm, "org.apache.spark.sql.*")
+            java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+            java_import(gateway.jvm, "scala.Tuple2")
+
+            jsc = gateway.entry_point.sc()
+            jconf = gateway.entry_point.sc().getConf()
+            jsqlc = gateway.entry_point.hivectx() if gateway.entry_point.hivectx() is not None \
+                else gateway.entry_point.sqlctx()
+
+            conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
+            sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+            global_dict['sc'] = sc
+            sqlc = SQLContext(sc, jsqlc)
+            global_dict['sqlContext'] = sqlc
+
             if spark_major_version >= "2":
-                exec('from pyspark.shell import spark', global_dict)
+                from pyspark.sql import SparkSession
+                spark_session = SparkSession(sc, gateway.entry_point.sparkSession())
+                global_dict['spark'] = spark_session
             else:
                 # LIVY-294, need to check whether HiveContext can work properly,
                 # fallback to SQLContext if HiveContext can not be initialized successfully.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
index cf2cf35..b058d00 100644
--- a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream
 
 import scala.tools.nsc.interpreter.Results
 
+import org.apache.spark.SparkConf
 import org.apache.spark.rdd.RDD
 import org.json4s.DefaultFormats
 import org.json4s.Extraction
@@ -28,6 +29,7 @@ import org.json4s.JsonAST._
 import org.json4s.JsonDSL._
 
 import org.apache.livy.Logging
+import org.apache.livy.rsc.driver.SparkEntries
 
 object AbstractSparkInterpreter {
   private[repl] val KEEP_NEWLINE_REGEX = """(?<=\n)""".r
@@ -41,6 +43,10 @@ abstract class AbstractSparkInterpreter extends Interpreter with Logging {
 
   protected val outputStream = new ByteArrayOutputStream()
 
+  protected var entries: SparkEntries = _
+
+  def sparkEntries(): SparkEntries = entries
+
   final def kind: String = "spark"
 
   protected def isStarted(): Boolean
@@ -49,6 +55,52 @@ abstract class AbstractSparkInterpreter extends Interpreter with Logging {
 
   protected def valueOfTerm(name: String): Option[Any]
 
+  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit
+
+  protected def conf: SparkConf
+
+  protected def postStart(): Unit = {
+    entries = new SparkEntries(conf)
+
+    if (isSparkSessionPresent()) {
+      bind("spark",
+        sparkEntries.sparkSession().getClass.getCanonicalName,
+        sparkEntries.sparkSession(),
+        List("""@transient"""))
+      bind("sc", "org.apache.spark.SparkContext", sparkEntries.sc().sc, List("""@transient"""))
+
+      execute("import org.apache.spark.SparkContext._")
+      execute("import spark.implicits._")
+      execute("import spark.sql")
+      execute("import org.apache.spark.sql.functions._")
+    } else {
+      bind("sc", "org.apache.spark.SparkContext", sparkEntries.sc().sc, List("""@transient"""))
+      val sqlContext = Option(sparkEntries.hivectx()).getOrElse(sparkEntries.sqlctx())
+      bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient"""))
+
+      execute("import org.apache.spark.SparkContext._")
+      execute("import sqlContext.implicits._")
+      execute("import sqlContext.sql")
+      execute("import org.apache.spark.sql.functions._")
+    }
+  }
+
+  override def close(): Unit = {
+    if (entries != null) {
+      entries.stop()
+      entries = null
+    }
+  }
+
+  private def isSparkSessionPresent(): Boolean = {
+    try {
+      Class.forName("org.apache.spark.sql.SparkSession")
+      true
+    } catch {
+      case _: ClassNotFoundException | _: NoClassDefFoundError => false
+    }
+  }
+
   override protected[repl] def execute(code: String): Interpreter.ExecuteResponse =
     restoreContextClassLoader {
       require(isStarted())

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/BypassPySparkJob.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/BypassPySparkJob.scala b/repl/src/main/scala/org/apache/livy/repl/BypassPySparkJob.scala
index 6a9bdd1..7e79c41 100644
--- a/repl/src/main/scala/org/apache/livy/repl/BypassPySparkJob.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/BypassPySparkJob.scala
@@ -19,16 +19,13 @@ package org.apache.livy.repl
 import java.nio.charset.StandardCharsets
 
 import org.apache.livy.{Job, JobContext}
+import org.apache.livy.sessions._
 
 class BypassPySparkJob(
     serializedJob: Array[Byte],
-    replDriver: ReplDriver) extends Job[Array[Byte]] {
+    pi: PythonInterpreter) extends Job[Array[Byte]] {
 
   override def call(jc: JobContext): Array[Byte] = {
-    val interpreter = replDriver.interpreter
-    require(interpreter != null && interpreter.isInstanceOf[PythonInterpreter])
-    val pi = interpreter.asInstanceOf[PythonInterpreter]
-
     val resultByteArray = pi.pysparkJobProcessor.processBypassJob(serializedJob)
     val resultString = new String(resultByteArray, StandardCharsets.UTF_8)
     if (resultString.startsWith("Client job error:")) {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/Interpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/Interpreter.scala b/repl/src/main/scala/org/apache/livy/repl/Interpreter.scala
index 058b20b..860513e 100644
--- a/repl/src/main/scala/org/apache/livy/repl/Interpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/Interpreter.scala
@@ -17,7 +17,6 @@
 
 package org.apache.livy.repl
 
-import org.apache.spark.SparkContext
 import org.json4s.JObject
 
 object Interpreter {
@@ -38,10 +37,8 @@ trait Interpreter {
 
   /**
    * Start the Interpreter.
-   *
-   * @return A SparkContext
    */
-  def start(): SparkContext
+  def start(): Unit
 
   /**
    * Execute the code and return the result, it may

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/ProcessInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/ProcessInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/ProcessInterpreter.scala
index 7995ba0..cb3e0ea 100644
--- a/repl/src/main/scala/org/apache/livy/repl/ProcessInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/ProcessInterpreter.scala
@@ -23,11 +23,9 @@ import java.util.concurrent.locks.ReentrantLock
 import scala.concurrent.Promise
 import scala.io.Source
 
-import org.apache.spark.SparkContext
 import org.json4s.JValue
 
 import org.apache.livy.{Logging, Utils}
-import org.apache.livy.client.common.ClientConf
 
 private sealed trait Request
 private case class ExecuteRequest(code: String, promise: Promise[JValue]) extends Request
@@ -45,14 +43,8 @@ abstract class ProcessInterpreter(process: Process)
   protected[this] val stdin = new PrintWriter(process.getOutputStream)
   protected[this] val stdout = new BufferedReader(new InputStreamReader(process.getInputStream), 1)
 
-  override def start(): SparkContext = {
+  override def start(): Unit = {
     waitUntilReady()
-
-    if (ClientConf.TEST_MODE) {
-      null.asInstanceOf[SparkContext]
-    } else {
-      SparkContext.getOrCreate()
-    }
   }
 
   override protected[repl] def execute(code: String): Interpreter.ExecuteResponse = {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
index bfd5d76..f4e16f8 100644
--- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
@@ -37,21 +37,18 @@ import py4j.reflection.PythonProxyHandler
 
 import org.apache.livy.Logging
 import org.apache.livy.client.common.ClientConf
-import org.apache.livy.rsc.BaseProtocol
-import org.apache.livy.rsc.driver.BypassJobWrapper
+import org.apache.livy.rsc.driver.SparkEntries
 import org.apache.livy.sessions._
 
 // scalastyle:off println
 object PythonInterpreter extends Logging {
 
-  def apply(conf: SparkConf, kind: Kind): Interpreter = {
-    val pythonExec = kind match {
-        case PySpark() => sys.env.getOrElse("PYSPARK_PYTHON", "python")
-        case PySpark3() => sys.env.getOrElse("PYSPARK3_PYTHON", "python3")
-        case _ => throw new IllegalArgumentException(s"Unknown kind: $kind")
-    }
+  def apply(conf: SparkConf, sparkEntries: SparkEntries): Interpreter = {
+    val pythonExec = sys.env.get("PYSPARK_PYTHON")
+      .orElse(sys.props.get("pyspark.python")) // This java property is only used for internal UT.
+      .getOrElse("python")
 
-    val gatewayServer = new GatewayServer(null, 0)
+    val gatewayServer = new GatewayServer(sparkEntries, 0)
     gatewayServer.start()
 
     val builder = new ProcessBuilder(Seq(pythonExec, createFakeShell().toString).asJava)
@@ -71,7 +68,7 @@ object PythonInterpreter extends Logging {
     env.put("LIVY_SPARK_MAJOR_VERSION", conf.get("spark.livy.spark_major_version", "1"))
     builder.redirectError(Redirect.PIPE)
     val process = builder.start()
-    new PythonInterpreter(process, gatewayServer, kind.toString)
+    new PythonInterpreter(process, gatewayServer)
   }
 
   private def findPySparkArchives(): Seq[String] = {
@@ -188,14 +185,12 @@ object PythonInterpreter extends Logging {
 
 private class PythonInterpreter(
     process: Process,
-    gatewayServer: GatewayServer,
-    pyKind: String)
+    gatewayServer: GatewayServer)
   extends ProcessInterpreter(process)
-  with Logging
-{
+  with Logging {
   implicit val formats = DefaultFormats
 
-  override def kind: String = pyKind
+  override def kind: String = "pyspark"
 
   private[repl] val pysparkJobProcessor =
     PythonInterpreter.initiatePy4jCallbackGateway(gatewayServer)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala b/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
index 75966be..7f35982 100644
--- a/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
@@ -22,9 +22,9 @@ import scala.concurrent.duration.Duration
 
 import io.netty.channel.ChannelHandlerContext
 import org.apache.spark.SparkConf
-import org.apache.spark.api.java.JavaSparkContext
 
 import org.apache.livy.Logging
+import org.apache.livy.client.common.ClientConf
 import org.apache.livy.rsc.{BaseProtocol, ReplJobResults, RSCConf}
 import org.apache.livy.rsc.BaseProtocol.ReplState
 import org.apache.livy.rsc.driver._
@@ -37,23 +37,11 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
 
   private[repl] var session: Session = _
 
-  private val kind = Kind(livyConf.get(RSCConf.Entry.SESSION_KIND))
-
-  private[repl] var interpreter: Interpreter = _
-
-  override protected def initializeContext(): JavaSparkContext = {
-    interpreter = kind match {
-      case PySpark() => PythonInterpreter(conf, PySpark())
-      case PySpark3() =>
-        PythonInterpreter(conf, PySpark3())
-      case Spark() => new SparkInterpreter(conf)
-      case SparkR() => SparkRInterpreter(conf)
-    }
-    session = new Session(livyConf, interpreter, { s => broadcast(new ReplState(s.toString)) })
-
-    Option(Await.result(session.start(), Duration.Inf))
-      .map(new JavaSparkContext(_))
-      .orNull
+  override protected def initializeSparkEntries(): SparkEntries = {
+    session = new Session(livyConf = livyConf,
+      sparkConf = conf,
+      stateChangedCallback = { s => broadcast(new ReplState(s.toString)) })
+    Await.result(session.start(), Duration.Inf)
   }
 
   override protected def shutdownContext(): Unit = {
@@ -67,7 +55,7 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
   }
 
   def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.ReplJobRequest): Int = {
-    session.execute(msg.code)
+    session.execute(msg.code, msg.codeType)
   }
 
   def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.CancelReplJobRequest): Unit = {
@@ -100,27 +88,27 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
   }
 
   override protected def createWrapper(msg: BaseProtocol.BypassJobRequest): BypassJobWrapper = {
-    kind match {
-      case PySpark() | PySpark3() => new BypassJobWrapper(this, msg.id,
-        new BypassPySparkJob(msg.serializedJob, this))
+    Kind(msg.jobType) match {
+      case PySpark() =>
+        new BypassJobWrapper(this, msg.id,
+          new BypassPySparkJob(msg.serializedJob,
+            session.interpreter(PySpark()).asInstanceOf[PythonInterpreter]))
       case _ => super.createWrapper(msg)
     }
   }
 
   override protected def addFile(path: String): Unit = {
-    require(interpreter != null)
-    interpreter match {
-      case pi: PythonInterpreter => pi.addFile(path)
-      case _ => super.addFile(path)
+    if (!ClientConf.TEST_MODE) {
+      session.interpreter(PySpark()).asInstanceOf[PythonInterpreter].addFile(path)
     }
+    super.addFile(path)
   }
 
   override protected def addJarOrPyFile(path: String): Unit = {
-    require(interpreter != null)
-    interpreter match {
-      case pi: PythonInterpreter => pi.addPyFile(this, conf, path)
-      case _ => super.addJarOrPyFile(path)
+    if (!ClientConf.TEST_MODE) {
+      session.interpreter(PySpark()).asInstanceOf[PythonInterpreter].addPyFile(this, conf, path)
     }
+    super.addJarOrPyFile(path)
   }
 
   override protected def onClientAuthenticated(client: Rpc): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/Session.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/Session.scala b/repl/src/main/scala/org/apache/livy/repl/Session.scala
index 40176ea..f245193 100644
--- a/repl/src/main/scala/org/apache/livy/repl/Session.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/Session.scala
@@ -23,17 +23,18 @@ import java.util.concurrent.Executors
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.json4s.jackson.JsonMethods.{compact, render}
 import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 
 import org.apache.livy.Logging
 import org.apache.livy.rsc.RSCConf
-import org.apache.livy.rsc.driver.{Statement, StatementState}
+import org.apache.livy.rsc.driver.{SparkEntries, Statement, StatementState}
 import org.apache.livy.sessions._
 
 object Session {
@@ -49,7 +50,8 @@ object Session {
 
 class Session(
     livyConf: RSCConf,
-    interpreter: Interpreter,
+    sparkConf: SparkConf,
+    mockSparkInterpreter: Option[SparkInterpreter] = None,
     stateChangedCallback: SessionState => Unit = { _ => })
   extends Logging {
   import Session._
@@ -62,8 +64,6 @@ class Session(
 
   private implicit val formats = DefaultFormats
 
-  @volatile private[repl] var _sc: Option[SparkContext] = None
-
   private var _state: SessionState = SessionState.NotStarted()
 
   // Number of statements kept in driver's memory
@@ -77,40 +77,88 @@ class Session(
 
   private val newStatementId = new AtomicInteger(0)
 
+  private val defaultInterpKind = Kind(livyConf.get(RSCConf.Entry.SESSION_KIND))
+
+  private val interpGroup = new mutable.HashMap[Kind, Interpreter]()
+
+  @volatile private var entries: SparkEntries = _
+
   stateChangedCallback(_state)
 
-  def start(): Future[SparkContext] = {
+  private def sc: SparkContext = {
+    require(entries != null)
+    entries.sc().sc
+  }
+
+  private[repl] def interpreter(kind: Kind): Interpreter = interpGroup.synchronized {
+    if (interpGroup.contains(kind)) {
+      interpGroup(kind)
+    } else {
+      require(entries != null,
+        "SparkEntries should not be null when lazily initialize other interpreters.")
+
+      val interp = kind match {
+        case Spark() =>
+          // This should never be touched here.
+          throw new IllegalStateException("SparkInterpreter should not be lazily created.")
+        case PySpark() => PythonInterpreter(sparkConf, entries)
+        case SparkR() => SparkRInterpreter(sparkConf, entries)
+      }
+      interp.start()
+      interpGroup(kind) = interp
+
+      interp
+    }
+  }
+
+  def start(): Future[SparkEntries] = {
     val future = Future {
       changeState(SessionState.Starting())
-      val sc = interpreter.start()
-      _sc = Option(sc)
+
+      // Always start SparkInterpreter after beginning, because we rely on SparkInterpreter to
+      // initialize SparkContext and create SparkEntries.
+      val sparkInterp = mockSparkInterpreter.getOrElse(new SparkInterpreter(sparkConf))
+      sparkInterp.start()
+
+      entries = sparkInterp.sparkEntries()
+      require(entries != null, "SparkEntries object should not be null in Spark Interpreter.")
+      interpGroup.synchronized {
+        interpGroup.put(Spark(), sparkInterp)
+      }
+
       changeState(SessionState.Idle())
-      sc
+      entries
     }(interpreterExecutor)
 
     future.onFailure { case _ => changeState(SessionState.Error()) }(interpreterExecutor)
     future
   }
 
-  def kind: String = interpreter.kind
-
   def state: SessionState = _state
 
   def statements: collection.Map[Int, Statement] = _statements.synchronized {
     _statements.toMap
   }
 
-  def execute(code: String): Int = {
+  def execute(code: String, codeType: String = null): Int = {
+    val tpe = if (codeType != null) {
+      Kind(codeType)
+    } else if (defaultInterpKind != Shared()) {
+      defaultInterpKind
+    } else {
+      throw new IllegalArgumentException(s"Code type should be specified if session kind is shared")
+    }
+
     val statementId = newStatementId.getAndIncrement()
     val statement = new Statement(statementId, code, StatementState.Waiting, null)
     _statements.synchronized { _statements(statementId) = statement }
 
     Future {
-      setJobGroup(statementId)
+      setJobGroup(tpe, statementId)
       statement.compareAndTransit(StatementState.Waiting, StatementState.Running)
 
       if (statement.state.get() == StatementState.Running) {
-        statement.output = executeCode(statementId, code)
+        statement.output = executeCode(interpreter(tpe), statementId, code)
       }
 
       statement.compareAndTransit(StatementState.Running, StatementState.Available)
@@ -150,7 +198,7 @@ class Session(
           info(s"Failed to cancel statement $statementId.")
           statement.compareAndTransit(StatementState.Cancelling, StatementState.Cancelled)
         } else {
-          _sc.foreach(_.cancelJobGroup(statementId.toString))
+          sc.cancelJobGroup(statementId.toString)
           if (statement.state.get() == StatementState.Cancelling) {
             Thread.sleep(livyConf.getTimeAsMs(RSCConf.Entry.JOB_CANCEL_TRIGGER_INTERVAL))
           }
@@ -166,7 +214,7 @@ class Session(
   def close(): Unit = {
     interpreterExecutor.shutdown()
     cancelExecutor.shutdown()
-    interpreter.close()
+    interpGroup.values.foreach(_.close())
   }
 
   /**
@@ -175,21 +223,19 @@ class Session(
   def progressOfStatement(stmtId: Int): Double = {
     val jobGroup = statementIdToJobGroup(stmtId)
 
-    _sc.map { sc =>
-      val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
-      val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
-      val stages = jobs.flatMap { job =>
-        job.stageIds().flatMap(sc.statusTracker.getStageInfo)
-      }
+    val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
+    val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
+    val stages = jobs.flatMap { job =>
+      job.stageIds().flatMap(sc.statusTracker.getStageInfo)
+    }
 
-      val taskCount = stages.map(_.numTasks).sum
-      val completedTaskCount = stages.map(_.numCompletedTasks).sum
-      if (taskCount == 0) {
-        0.0
-      } else {
-        completedTaskCount.toDouble / taskCount
-      }
-    }.getOrElse(0.0)
+    val taskCount = stages.map(_.numTasks).sum
+    val completedTaskCount = stages.map(_.numCompletedTasks).sum
+    if (taskCount == 0) {
+      0.0
+    } else {
+      completedTaskCount.toDouble / taskCount
+    }
   }
 
   private def changeState(newState: SessionState): Unit = {
@@ -199,7 +245,7 @@ class Session(
     stateChangedCallback(newState)
   }
 
-  private def executeCode(executionCount: Int, code: String): String = {
+  private def executeCode(interp: Interpreter, executionCount: Int, code: String): String = {
     changeState(SessionState.Busy())
 
     def transitToIdle() = {
@@ -210,7 +256,7 @@ class Session(
     }
 
     val resultInJson = try {
-      interpreter.execute(code) match {
+      interp.execute(code) match {
         case Interpreter.ExecuteSuccess(data) =>
           transitToIdle()
 
@@ -261,17 +307,17 @@ class Session(
     compact(render(resultInJson))
   }
 
-  private def setJobGroup(statementId: Int): String = {
+  private def setJobGroup(codeType: Kind, statementId: Int): String = {
     val jobGroup = statementIdToJobGroup(statementId)
-    val cmd = Kind(interpreter.kind) match {
+    val cmd = codeType match {
       case Spark() =>
         // A dummy value to avoid automatic value binding in scala REPL.
         s"""val _livyJobGroup$jobGroup = sc.setJobGroup("$jobGroup",""" +
           s""""Job group for statement $jobGroup")"""
-      case PySpark() | PySpark3() =>
+      case PySpark() =>
         s"""sc.setJobGroup("$jobGroup", "Job group for statement $jobGroup")"""
       case SparkR() =>
-        interpreter.asInstanceOf[SparkRInterpreter].sparkMajorVersion match {
+        sc.getConf.get("spark.livy.spark_major_version", "1") match {
           case "1" =>
             s"""setJobGroup(sc, "$jobGroup", "Job group for statement $jobGroup", """ +
               "FALSE)"
@@ -280,7 +326,7 @@ class Session(
         }
     }
     // Set the job group
-    executeCode(statementId, cmd)
+    executeCode(interpreter(codeType), statementId, cmd)
   }
 
   private def statementIdToJobGroup(statementId: Int): String = {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/SparkContextInitializer.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/SparkContextInitializer.scala b/repl/src/main/scala/org/apache/livy/repl/SparkContextInitializer.scala
deleted file mode 100644
index 4d47824..0000000
--- a/repl/src/main/scala/org/apache/livy/repl/SparkContextInitializer.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.repl
-
-import org.apache.spark.{SparkConf, SparkContext}
-
-import org.apache.livy.Logging
-
-/**
- * A mixin trait for Spark entry point creation. This trait exists two different code path
- * separately for Spark1 and Spark2, depends on whether SparkSession exists or not.
- */
-trait SparkContextInitializer extends Logging {
-  self: SparkInterpreter =>
-
-  def createSparkContext(conf: SparkConf): Unit = {
-    if (isSparkSessionPresent()) {
-      spark2CreateContext(conf)
-    } else {
-      spark1CreateContext(conf)
-    }
-  }
-
-  private def spark1CreateContext(conf: SparkConf): Unit = {
-    sparkContext = SparkContext.getOrCreate(conf)
-    var sqlContext: Object = null
-
-    if (conf.getBoolean("spark.repl.enableHiveContext", false)) {
-      try {
-        val loader = Option(Thread.currentThread().getContextClassLoader)
-          .getOrElse(getClass.getClassLoader)
-        if (loader.getResource("hive-site.xml") == null) {
-          warn("livy.repl.enable-hive-context is true but no hive-site.xml found on classpath.")
-        }
-
-        sqlContext = Class.forName("org.apache.spark.sql.hive.HiveContext")
-          .getConstructor(classOf[SparkContext]).newInstance(sparkContext).asInstanceOf[Object]
-        info("Created sql context (with Hive support).")
-      } catch {
-        case _: NoClassDefFoundError =>
-          sqlContext = Class.forName("org.apache.spark.sql.SQLContext")
-            .getConstructor(classOf[SparkContext]).newInstance(sparkContext).asInstanceOf[Object]
-          info("Created sql context.")
-      }
-    } else {
-      sqlContext = Class.forName("org.apache.spark.sql.SQLContext")
-        .getConstructor(classOf[SparkContext]).newInstance(sparkContext).asInstanceOf[Object]
-      info("Created sql context.")
-    }
-
-    bind("sc", "org.apache.spark.SparkContext", sparkContext, List("""@transient"""))
-    bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient"""))
-
-    execute("import org.apache.spark.SparkContext._")
-    execute("import sqlContext.implicits._")
-    execute("import sqlContext.sql")
-    execute("import org.apache.spark.sql.functions._")
-  }
-
-  private def spark2CreateContext(conf: SparkConf): Unit = {
-    val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$")
-    val sparkObj = sparkClz.getField("MODULE$").get(null)
-
-    val builderMethod = sparkClz.getMethod("builder")
-    val builder = builderMethod.invoke(sparkObj)
-    builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
-
-    var spark: Object = null
-    if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive") {
-      if (sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean]) {
-        val loader = Option(Thread.currentThread().getContextClassLoader)
-          .getOrElse(getClass.getClassLoader)
-        if (loader.getResource("hive-site.xml") == null) {
-          warn("livy.repl.enable-hive-context is true but no hive-site.xml found on classpath.")
-        }
-
-        builder.getClass.getMethod("enableHiveSupport").invoke(builder)
-        spark = builder.getClass.getMethod("getOrCreate").invoke(builder)
-        info("Created Spark session (with Hive support).")
-      } else {
-        builder.getClass.getMethod("config", classOf[String], classOf[String])
-          .invoke(builder, "spark.sql.catalogImplementation", "in-memory")
-        spark = builder.getClass.getMethod("getOrCreate").invoke(builder)
-        info("Created Spark session.")
-      }
-    } else {
-      spark = builder.getClass.getMethod("getOrCreate").invoke(builder)
-      info("Created Spark session.")
-    }
-
-    sparkContext = spark.getClass.getMethod("sparkContext").invoke(spark)
-      .asInstanceOf[SparkContext]
-
-    bind("spark", spark.getClass.getCanonicalName, spark, List("""@transient"""))
-    bind("sc", "org.apache.spark.SparkContext", sparkContext, List("""@transient"""))
-
-    execute("import org.apache.spark.SparkContext._")
-    execute("import spark.implicits._")
-    execute("import spark.sql")
-    execute("import org.apache.spark.sql.functions._")
-  }
-
-  private def isSparkSessionPresent(): Boolean = {
-    try {
-      Class.forName("org.apache.spark.sql.SparkSession")
-      true
-    } catch {
-      case _: ClassNotFoundException | _: NoClassDefFoundError => false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala
index b745861..9330248 100644
--- a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala
@@ -17,8 +17,7 @@
 
 package org.apache.livy.repl
 
-import java.io.{File, FileOutputStream}
-import java.lang.ProcessBuilder.Redirect
+import java.io.File
 import java.nio.file.Files
 import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
 
@@ -28,13 +27,14 @@ import scala.reflect.runtime.universe
 
 import org.apache.commons.codec.binary.Base64
 import org.apache.commons.lang.StringEscapeUtils
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
+import org.apache.spark.SparkConf
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SQLContext
 import org.json4s._
 import org.json4s.JsonDSL._
 
 import org.apache.livy.client.common.ClientConf
-import org.apache.livy.rsc.RSCConf
+import org.apache.livy.rsc.driver.SparkEntries
 
 private case class RequestResponse(content: String, error: Boolean)
 
@@ -44,6 +44,7 @@ object SparkRInterpreter {
   private val LIVY_ERROR_MARKER = "----LIVY_END_OF_ERROR----"
   private val PRINT_MARKER = f"""print("$LIVY_END_MARKER")"""
   private val EXPECTED_OUTPUT = f"""[1] "$LIVY_END_MARKER""""
+  private var sparkEntries: SparkEntries = null
 
   private val PLOT_REGEX = (
     "(" +
@@ -67,7 +68,8 @@ object SparkRInterpreter {
     ")"
     ).r.unanchored
 
-  def apply(conf: SparkConf): SparkRInterpreter = {
+  def apply(conf: SparkConf, entries: SparkEntries): SparkRInterpreter = {
+    sparkEntries = entries
     val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
     val mirror = universe.runtimeMirror(getClass.getClassLoader)
     val sparkRBackendClass = mirror.classLoader.loadClass("org.apache.spark.api.r.RBackend")
@@ -119,8 +121,7 @@ object SparkRInterpreter {
       builder.redirectErrorStream(true)
       val process = builder.start()
       new SparkRInterpreter(process, backendInstance, backendThread,
-        conf.get("spark.livy.spark_major_version", "1"),
-        conf.getBoolean("spark.repl.enableHiveContext", false))
+        conf.getInt("spark.livy.spark_major_version", 1))
     } catch {
       case e: Exception =>
         if (backendThread != null) {
@@ -129,13 +130,27 @@ object SparkRInterpreter {
         throw e
     }
   }
+
+  def getSparkContext(): JavaSparkContext = {
+    require(sparkEntries != null)
+    sparkEntries.sc()
+  }
+
+  def getSparkSession(): Object = {
+    require(sparkEntries != null)
+    sparkEntries.sparkSession()
+  }
+
+  def getSQLContext(): SQLContext = {
+    require(sparkEntries != null)
+    if (sparkEntries.hivectx() != null) sparkEntries.hivectx() else sparkEntries.sqlctx()
+  }
 }
 
 class SparkRInterpreter(process: Process,
     backendInstance: Any,
     backendThread: Thread,
-    val sparkMajorVersion: String,
-    hiveEnabled: Boolean)
+    val sparkMajorVersion: Int)
   extends ProcessInterpreter(process) {
   import SparkRInterpreter._
 
@@ -149,24 +164,23 @@ class SparkRInterpreter(process: Process,
     // Set the option to catch and ignore errors instead of halting.
     sendRequest("options(error = dump.frames)")
     if (!ClientConf.TEST_MODE) {
+      // scalastyle:off line.size.limit
       sendRequest("library(SparkR)")
-      if (sparkMajorVersion >= "2") {
-        if (hiveEnabled) {
-          sendRequest("spark <- SparkR::sparkR.session()")
-        } else {
-          sendRequest("spark <- SparkR::sparkR.session(enableHiveSupport=FALSE)")
-        }
-        sendRequest(
-          """sc <- SparkR:::callJStatic("org.apache.spark.sql.api.r.SQLUtils",
-            "getJavaSparkContext", spark)""")
-      } else {
-        sendRequest("sc <- sparkR.init()")
-        if (hiveEnabled) {
-          sendRequest("sqlContext <- sparkRHive.init(sc)")
-        } else {
-          sendRequest("sqlContext <- sparkRSQL.init(sc)")
-        }
+      sendRequest("""port <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")""")
+      sendRequest("""SparkR:::connectBackend("localhost", port, 6000)""")
+      sendRequest("""assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)""")
+
+      sendRequest("""assign(".sc", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkContext"), envir = SparkR:::.sparkREnv)""")
+      sendRequest("""assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""")
+
+      if (sparkMajorVersion >= 2) {
+        sendRequest("""assign(".sparkRsession", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkSession"), envir = SparkR:::.sparkREnv)""")
+        sendRequest("""assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""")
       }
+
+      sendRequest("""assign(".sqlc", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSQLContext"), envir = SparkR:::.sparkREnv)""")
+      sendRequest("""assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)""")
+      // scalastyle:on line.size.limit
     }
 
     isStarted.countDown()

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
index a13f924..bcea0df 100644
--- a/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala
@@ -24,6 +24,7 @@ import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import org.apache.spark.SparkConf
 import org.json4s._
 import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.concurrent.Eventually._
@@ -31,13 +32,16 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.livy.LivyBaseUnitTestSuite
 import org.apache.livy.rsc.RSCConf
 import org.apache.livy.rsc.driver.{Statement, StatementState}
-import org.apache.livy.sessions.SessionState
+import org.apache.livy.sessions._
 
-abstract class BaseSessionSpec extends FlatSpec with Matchers with LivyBaseUnitTestSuite {
+abstract class BaseSessionSpec(kind: Kind)
+    extends FlatSpec with Matchers with LivyBaseUnitTestSuite {
 
   implicit val formats = DefaultFormats
 
-  private val rscConf = new RSCConf(new Properties())
+  private val rscConf = new RSCConf(new Properties()).set(RSCConf.Entry.SESSION_KIND, kind.toString)
+
+  private val sparkConf = new SparkConf()
 
   protected def execute(session: Session)(code: String): Statement = {
     val id = session.execute(code)
@@ -51,7 +55,7 @@ abstract class BaseSessionSpec extends FlatSpec with Matchers with LivyBaseUnitT
   protected def withSession(testCode: Session => Any): Unit = {
     val stateChangedCalled = new AtomicInteger()
     val session =
-      new Session(rscConf, createInterpreter(), { _ => stateChangedCalled.incrementAndGet() })
+      new Session(rscConf, sparkConf, None, { _ => stateChangedCalled.incrementAndGet() })
     try {
       // Session's constructor should fire an initial state change event.
       stateChangedCalled.intValue() shouldBe 1
@@ -65,16 +69,12 @@ abstract class BaseSessionSpec extends FlatSpec with Matchers with LivyBaseUnitT
     }
   }
 
-  protected def createInterpreter(): Interpreter
-
   it should "start in the starting or idle state" in {
-    val session = new Session(rscConf, createInterpreter())
+    val session = new Session(rscConf, sparkConf)
     val future = session.start()
     try {
-      eventually(timeout(30 seconds), interval(100 millis)) {
-        session.state should (equal (SessionState.Starting()) or equal (SessionState.Idle()))
-      }
       Await.ready(future, 60 seconds)
+      session.state should (equal (SessionState.Starting()) or equal (SessionState.Idle()))
     } finally {
       session.close()
     }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
index 1404765..fb0704e 100644
--- a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
@@ -22,7 +22,7 @@ import org.json4s.{DefaultFormats, JNull, JValue}
 import org.json4s.JsonDSL._
 import org.scalatest._
 
-import org.apache.livy.rsc.RSCConf
+import org.apache.livy.rsc.driver.SparkEntries
 import org.apache.livy.sessions._
 
 abstract class PythonBaseInterpreterSpec extends BaseInterpreterSpec {
@@ -244,7 +244,10 @@ class Python2InterpreterSpec extends PythonBaseInterpreterSpec {
 
   implicit val formats = DefaultFormats
 
-  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
+  override def createInterpreter(): Interpreter = {
+    val sparkConf = new SparkConf()
+    PythonInterpreter(sparkConf, new SparkEntries(sparkConf))
+  }
 
   // Scalastyle is treating unicode escape as non ascii characters. Turn off the check.
   // scalastyle:off non.ascii.character.disallowed
@@ -262,7 +265,7 @@ class Python2InterpreterSpec extends PythonBaseInterpreterSpec {
   // scalastyle:on non.ascii.character.disallowed
 }
 
-class Python3InterpreterSpec extends PythonBaseInterpreterSpec {
+class Python3InterpreterSpec extends PythonBaseInterpreterSpec with BeforeAndAfterAll {
 
   implicit val formats = DefaultFormats
 
@@ -271,7 +274,20 @@ class Python3InterpreterSpec extends PythonBaseInterpreterSpec {
     test()
   }
 
-  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    sys.props.put("pyspark.python", "python3")
+  }
+
+  override def afterAll(): Unit = {
+    sys.props.remove("pyspark.python")
+    super.afterAll()
+  }
+
+  override def createInterpreter(): Interpreter = {
+    val sparkConf = new SparkConf()
+    PythonInterpreter(sparkConf, new SparkEntries(sparkConf))
+  }
 
   it should "check python version is 3.x" in withInterpreter { interpreter =>
     val response = interpreter.execute("""import sys

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
index 0883f27..69d4131 100644
--- a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
@@ -17,15 +17,13 @@
 
 package org.apache.livy.repl
 
-import org.apache.spark.SparkConf
 import org.json4s.Extraction
 import org.json4s.jackson.JsonMethods.parse
 import org.scalatest._
 
-import org.apache.livy.rsc.RSCConf
 import org.apache.livy.sessions._
 
-abstract class PythonSessionSpec extends BaseSessionSpec {
+abstract class PythonSessionSpec extends BaseSessionSpec(PySpark()) {
 
   it should "execute `1 + 2` == 3" in withSession { session =>
     val statement = execute(session)("1 + 2")
@@ -172,18 +170,25 @@ abstract class PythonSessionSpec extends BaseSessionSpec {
   }
 }
 
-class Python2SessionSpec extends PythonSessionSpec {
-  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark())
-}
+class Python2SessionSpec extends PythonSessionSpec
 
-class Python3SessionSpec extends PythonSessionSpec {
+class Python3SessionSpec extends PythonSessionSpec with BeforeAndAfterAll {
 
   override protected def withFixture(test: NoArgTest): Outcome = {
     assume(!sys.props.getOrElse("skipPySpark3Tests", "false").toBoolean, "Skipping PySpark3 tests.")
     test()
   }
 
-  override def createInterpreter(): Interpreter = PythonInterpreter(new SparkConf(), PySpark3())
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    sys.props.put("pyspark.python", "python3")
+  }
+
+  override def afterAll(): Unit = {
+    sys.props.remove("pyspark.python")
+    super.afterAll()
+  }
+
 
   it should "check python version is 3.x" in withSession { session =>
     val statement = execute(session)(

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala b/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
index 6537f0c..6d7094d 100644
--- a/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/ReplDriverSuite.scala
@@ -53,7 +53,7 @@ class ReplDriverSuite extends FunSuite with LivyBaseUnitTestSuite {
       // This is sort of what InteractiveSession.scala does to detect an idle session.
       client.submit(new PingJob()).get(60, TimeUnit.SECONDS)
 
-      val statementId = client.submitReplCode("1 + 1").get
+      val statementId = client.submitReplCode("1 + 1", "spark").get
       eventually(timeout(30 seconds), interval(100 millis)) {
         val rawResult =
           client.getReplJobResults(statementId, 1).get(10, TimeUnit.SECONDS).statements(0)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
index 090b7cb..554d59e 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
@@ -20,41 +20,41 @@ package org.apache.livy.repl
 import java.util.Properties
 import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
 
-import org.mockito.Mockito.when
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.FunSpec
+import org.apache.spark.SparkConf
+import org.scalatest.{BeforeAndAfter, FunSpec}
 import org.scalatest.Matchers._
 import org.scalatest.concurrent.Eventually
-import org.scalatest.mock.MockitoSugar.mock
 import org.scalatest.time._
 
 import org.apache.livy.LivyBaseUnitTestSuite
 import org.apache.livy.repl.Interpreter.ExecuteResponse
 import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
 
-class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite {
+class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite with BeforeAndAfter {
   override implicit val patienceConfig =
-    PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis)))
+    PatienceConfig(timeout = scaled(Span(30, Seconds)), interval = scaled(Span(100, Millis)))
 
-  private val rscConf = new RSCConf(new Properties())
+  private val rscConf = new RSCConf(new Properties()).set(RSCConf.Entry.SESSION_KIND, "spark")
 
   describe("Session") {
+    var session: Session = null
+
+    after {
+      if (session != null) {
+        session.close()
+        session = null
+      }
+    }
+
     it("should call state changed callbacks in happy path") {
       val expectedStateTransitions =
         Array("not_started", "starting", "idle", "busy", "idle", "busy", "idle")
       val actualStateTransitions = new ConcurrentLinkedQueue[String]()
 
-      val interpreter = mock[Interpreter]
-      when(interpreter.kind).thenAnswer(new Answer[String] {
-        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
-      })
-
-      val session =
-        new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
-
+      session = new Session(rscConf, new SparkConf(), None,
+        { s => actualStateTransitions.add(s.toString) })
       session.start()
-
       session.execute("")
 
       eventually {
@@ -64,23 +64,19 @@ class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite {
 
     it("should not transit to idle if there're any pending statements.") {
       val expectedStateTransitions =
-        Array("not_started", "busy", "busy", "busy", "idle", "busy", "idle")
+        Array("not_started", "starting", "idle", "busy", "busy", "busy", "idle", "busy", "idle")
       val actualStateTransitions = new ConcurrentLinkedQueue[String]()
 
-      val interpreter = mock[Interpreter]
-      when(interpreter.kind).thenAnswer(new Answer[String] {
-        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
-      })
-
       val blockFirstExecuteCall = new CountDownLatch(1)
-      when(interpreter.execute("")).thenAnswer(new Answer[Interpreter.ExecuteResponse] {
-        override def answer(invocation: InvocationOnMock): ExecuteResponse = {
+      val interpreter = new SparkInterpreter(new SparkConf()) {
+        override def execute(code: String): ExecuteResponse = {
           blockFirstExecuteCall.await(10, TimeUnit.SECONDS)
-          null
+          super.execute(code)
         }
-      })
-      val session =
-        new Session(rscConf, interpreter, { s => actualStateTransitions.add(s.toString) })
+      }
+      session = new Session(rscConf, new SparkConf(), Some(interpreter),
+        { s => actualStateTransitions.add(s.toString) })
+      session.start()
 
       for (_ <- 1 to 2) {
         session.execute("")
@@ -93,13 +89,8 @@ class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite {
     }
 
     it("should remove old statements when reaching threshold") {
-      val interpreter = mock[Interpreter]
-      when(interpreter.kind).thenAnswer(new Answer[String] {
-        override def answer(invocationOnMock: InvocationOnMock): String = "spark"
-      })
-
       rscConf.set(RSCConf.Entry.RETAINED_STATEMENT_NUMBER, 2)
-      val session = new Session(rscConf, interpreter)
+      session = new Session(rscConf, new SparkConf())
       session.start()
 
       session.statements.size should be (0)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala
new file mode 100644
index 0000000..d63e22f
--- /dev/null
+++ b/repl/src/test/scala/org/apache/livy/repl/SharedSessionSpec.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.repl
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.json4s.Extraction
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods.parse
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
+import org.apache.livy.rsc.driver.{Statement, StatementState}
+import org.apache.livy.sessions._
+
+class SharedSessionSpec extends BaseSessionSpec(Shared()) {
+
+  private def execute(session: Session, code: String, codeType: String): Statement = {
+    val id = session.execute(code, codeType)
+    eventually(timeout(30 seconds), interval(100 millis)) {
+      val s = session.statements(id)
+      s.state.get() shouldBe StatementState.Available
+      s
+    }
+  }
+
+  it should "execute `1 + 2` == 3" in withSession { session =>
+    val statement = execute(session, "1 + 2", "spark")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "res0: Int = 3"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "access the spark context" in withSession { session =>
+    val statement = execute(session, """sc""", "spark")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+    val resultMap = result.extract[Map[String, JValue]]
+
+    // Manually extract the values since the line numbers in the exception could change.
+    resultMap("status").extract[String] should equal ("ok")
+    resultMap("execution_count").extract[Int] should equal (0)
+
+    val data = resultMap("data").extract[Map[String, JValue]]
+    data("text/plain").extract[String] should include (
+      "res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext")
+  }
+
+  it should "execute spark commands" in withSession { session =>
+    val statement = execute(session,
+      """sc.parallelize(0 to 1).map{i => i+1}.collect""".stripMargin, "spark")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map(
+        "text/plain" -> "res0: Array[Int] = Array(1, 2)"
+      )
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "throw exception if code type is not specified in shared session" in withSession {
+    session =>
+      intercept[IllegalArgumentException](session.execute("1 + 2"))
+  }
+
+  it should "execute `1 + 2 = 3` in Python" in withSession { session =>
+    val statement = execute(session, "1 + 2", "pyspark")
+    statement.id should equal (0)
+
+    val result = parse(statement.output)
+
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map("text/plain" -> "3")
+    ))
+
+    result should equal (expectedResult)
+  }
+
+  it should "execute `1 + 2 = 3` in R" in withSession { session =>
+    val statement = execute(session, "1 + 2", "sparkr")
+    statement.id should be (0)
+
+    val result = parse(statement.output)
+
+    val expectedResult = Extraction.decompose(Map(
+      "status" -> "ok",
+      "execution_count" -> 0,
+      "data" -> Map("text/plain" -> "[1] 3")
+    ))
+
+    result should be (expectedResult)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
index 374afbc..e032e15 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkRInterpreterSpec.scala
@@ -18,11 +18,11 @@
 package org.apache.livy.repl
 
 import org.apache.spark.SparkConf
-import org.json4s.{DefaultFormats, JValue}
+import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.scalatest._
 
-import org.apache.livy.rsc.RSCConf
+import org.apache.livy.rsc.driver.SparkEntries
 
 class SparkRInterpreterSpec extends BaseInterpreterSpec {
 
@@ -33,7 +33,11 @@ class SparkRInterpreterSpec extends BaseInterpreterSpec {
     super.withFixture(test)
   }
 
-  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
+
+  override def createInterpreter(): Interpreter = {
+    val sparkConf = new SparkConf()
+    SparkRInterpreter(sparkConf, new SparkEntries(sparkConf))
+  }
 
   it should "execute `1 + 2` == 3" in withInterpreter { interpreter =>
     val response = interpreter.execute("1 + 2")

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
index 42ed60a..dc1887d 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkRSessionSpec.scala
@@ -17,21 +17,18 @@
 
 package org.apache.livy.repl
 
-import org.apache.spark.SparkConf
 import org.json4s.Extraction
 import org.json4s.jackson.JsonMethods.parse
 
-import org.apache.livy.rsc.RSCConf
+import org.apache.livy.sessions._
 
-class SparkRSessionSpec extends BaseSessionSpec {
+class SparkRSessionSpec extends BaseSessionSpec(SparkR()) {
 
   override protected def withFixture(test: NoArgTest) = {
     assume(!sys.props.getOrElse("skipRTests", "false").toBoolean, "Skipping R tests.")
     super.withFixture(test)
   }
 
-  override def createInterpreter(): Interpreter = SparkRInterpreter(new SparkConf())
-
   it should "execute `1 + 2` == 3" in withSession { session =>
     val statement = execute(session)("1 + 2")
     statement.id should equal(0)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
index 9369519..7e51080 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala
@@ -20,18 +20,15 @@ package org.apache.livy.repl
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.apache.spark.SparkConf
 import org.json4s.Extraction
 import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods.parse
 import org.scalatest.concurrent.Eventually._
 
-import org.apache.livy.rsc.RSCConf
 import org.apache.livy.rsc.driver.StatementState
+import org.apache.livy.sessions._
 
-class SparkSessionSpec extends BaseSessionSpec {
-
-  override def createInterpreter(): Interpreter = new SparkInterpreter(new SparkConf())
+class SparkSessionSpec extends BaseSessionSpec(Spark()) {
 
   it should "execute `1 + 2` == 3" in withSession { session =>
     val statement = execute(session)("1 + 2")

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java b/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
index c25e98f..823e71b 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java
@@ -61,17 +61,19 @@ public abstract class BaseProtocol extends RpcDispatcher {
   public static class BypassJobRequest {
 
     public final String id;
+    public final String jobType;
     public final byte[] serializedJob;
     public final boolean synchronous;
 
-    public BypassJobRequest(String id, byte[] serializedJob, boolean synchronous) {
+    public BypassJobRequest(String id, String jobType, byte[] serializedJob, boolean synchronous) {
       this.id = id;
+      this.jobType = jobType;
       this.serializedJob = serializedJob;
       this.synchronous = synchronous;
     }
 
     public BypassJobRequest() {
-      this(null, null, false);
+      this(null, null, null, false);
     }
 
   }
@@ -171,13 +173,15 @@ public abstract class BaseProtocol extends RpcDispatcher {
   public static class ReplJobRequest {
 
     public final String code;
+    public final String codeType;
 
-    public ReplJobRequest(String code) {
+    public ReplJobRequest(String code, String codeType) {
       this.code = code;
+      this.codeType = codeType;
     }
 
     public ReplJobRequest() {
-      this(null);
+      this(null, null);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
index 8f46c1e..ed42e48 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
@@ -173,12 +173,8 @@ class ContextLauncher {
     }
     merge(conf, SPARK_JARS_KEY, livyJars, ",");
 
-    String kind = conf.get(SESSION_KIND);
-    if ("sparkr".equals(kind)) {
-      merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
-    } else if ("pyspark".equals(kind)) {
-      merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
-    }
+    merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
+    merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
 
     // Disable multiple attempts since the RPC server doesn't yet support multiple
     // connections for the same registered app.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
index 3161187..3fc3348 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
@@ -270,8 +270,8 @@ public class RSCClient implements LivyClient {
     return submit(new AddFileJob(uri.toString()));
   }
 
-  public String bypass(ByteBuffer serializedJob, boolean sync) {
-    return protocol.bypass(serializedJob, sync);
+  public String bypass(ByteBuffer serializedJob, String jobType, boolean sync) {
+    return protocol.bypass(serializedJob, jobType, sync);
   }
 
   public Future<BypassJobStatus> getBypassJobStatus(String id) {
@@ -286,8 +286,8 @@ public class RSCClient implements LivyClient {
     return contextInfo;
   }
 
-  public Future<Integer> submitReplCode(String code) throws Exception {
-    return deferredCall(new BaseProtocol.ReplJobRequest(code), Integer.class);
+  public Future<Integer> submitReplCode(String code, String codeType) throws Exception {
+    return deferredCall(new BaseProtocol.ReplJobRequest(code, codeType), Integer.class);
   }
 
   public void cancelReplCode(int statementId) throws Exception {
@@ -354,9 +354,10 @@ public class RSCClient implements LivyClient {
       return (Future<T>) deferredCall(new SyncJobRequest(job), Object.class);
     }
 
-    String bypass(ByteBuffer serializedJob, boolean sync) {
+    String bypass(ByteBuffer serializedJob, String jobType, boolean sync) {
       String jobId = UUID.randomUUID().toString();
-      Object msg = new BypassJobRequest(jobId, BufferUtils.toByteArray(serializedJob), sync);
+      Object msg =
+        new BypassJobRequest(jobId, jobType, BufferUtils.toByteArray(serializedJob), sync);
       deferredCall(msg, Void.class);
       return jobId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
index ddb5713..9faae0e 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
@@ -18,90 +18,47 @@
 package org.apache.livy.rsc.driver;
 
 import java.io.File;
-import java.lang.reflect.Method;
 
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.livy.JobContext;
 import org.apache.livy.rsc.Utils;
 
 class JobContextImpl implements JobContext {
 
-  private static final Logger LOG = LoggerFactory.getLogger(JobContextImpl.class);
-
-  private final JavaSparkContext sc;
   private final File localTmpDir;
-  private volatile SQLContext sqlctx;
-  private volatile HiveContext hivectx;
   private volatile JavaStreamingContext streamingctx;
   private final RSCDriver driver;
-  private volatile Object sparksession;
+  private final SparkEntries sparkEntries;
 
-  public JobContextImpl(JavaSparkContext sc, File localTmpDir, RSCDriver driver) {
-    this.sc = sc;
+  public JobContextImpl(SparkEntries sparkEntries, File localTmpDir, RSCDriver driver) {
+    this.sparkEntries = sparkEntries;
     this.localTmpDir = localTmpDir;
     this.driver = driver;
   }
 
   @Override
   public JavaSparkContext sc() {
-    return sc;
+    return sparkEntries.sc();
   }
 
   @Override
   public Object sparkSession() throws Exception {
-    if (sparksession == null) {
-      synchronized (this) {
-        if (sparksession == null) {
-          try {
-            Class<?> clz = Class.forName("org.apache.spark.sql.SparkSession$");
-            Object spark = clz.getField("MODULE$").get(null);
-            Method m = clz.getMethod("builder");
-            Object builder = m.invoke(spark);
-            builder.getClass().getMethod("sparkContext", SparkContext.class)
-              .invoke(builder, sc.sc());
-            sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder);
-          } catch (Exception e) {
-            LOG.warn("SparkSession is not supported", e);
-            throw e;
-          }
-        }
-      }
-    }
-
-    return sparksession;
+    return sparkEntries.sparkSession();
   }
 
   @Override
   public SQLContext sqlctx() {
-    if (sqlctx == null) {
-      synchronized (this) {
-        if (sqlctx == null) {
-          sqlctx = new SQLContext(sc);
-        }
-      }
-    }
-    return sqlctx;
+    return sparkEntries.sqlctx();
   }
 
   @Override
   public HiveContext hivectx() {
-    if (hivectx == null) {
-      synchronized (this) {
-        if (hivectx == null) {
-          hivectx = new HiveContext(sc.sc());
-        }
-      }
-    }
-    return hivectx;
+    return sparkEntries.hivectx();
   }
 
   @Override
@@ -113,13 +70,13 @@ class JobContextImpl implements JobContext {
   @Override
   public synchronized void createStreamingContext(long batchDuration) {
     Utils.checkState(streamingctx == null, "Streaming context is not null.");
-    streamingctx = new JavaStreamingContext(sc, new Duration(batchDuration));
+    streamingctx = new JavaStreamingContext(sparkEntries.sc(), new Duration(batchDuration));
   }
 
   @Override
   public synchronized void stopStreamingCtx() {
     Utils.checkState(streamingctx != null, "Streaming Context is null");
-    streamingctx.stop();
+    streamingctx.stop(false);
     streamingctx = null;
   }
 
@@ -132,9 +89,7 @@ class JobContextImpl implements JobContext {
     if (streamingctx != null) {
       stopStreamingCtx();
     }
-    if (sc != null) {
-      sc.stop();
-    }
+    sparkEntries.stop();
   }
 
   public void addFile(String path) {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/c1aafeb6/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
index 20be563..0c95c31 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -275,13 +274,11 @@ public class RSCDriver extends BaseProtocol {
    * and returning a null context is allowed. In that case, the context exposed by
    * JobContext will be null.
    */
-  protected JavaSparkContext initializeContext() throws Exception {
-    long t1 = System.nanoTime();
-    LOG.info("Starting Spark context...");
-    JavaSparkContext sc = new JavaSparkContext(conf);
-    LOG.info("Spark context finished initialization in {}ms",
-      TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1));
-    return sc;
+  protected SparkEntries initializeSparkEntries() throws Exception {
+    SparkEntries entries = new SparkEntries(conf);
+    // Explicitly call sc() to initialize SparkContext.
+    entries.sc();
+    return entries;
   }
 
   protected void onClientAuthenticated(final Rpc client) {
@@ -325,9 +322,9 @@ public class RSCDriver extends BaseProtocol {
     try {
       initializeServer();
 
-      JavaSparkContext sc = initializeContext();
+      SparkEntries entries = initializeSparkEntries();
       synchronized (jcLock) {
-        jc = new JobContextImpl(sc, localTmpDir, this);
+        jc = new JobContextImpl(entries, localTmpDir, this);
         jcLock.notifyAll();
       }