You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by va...@apache.org on 2018/11/29 18:09:48 UTC

[2/2] incubator-livy git commit: [LIVY-503] Separate thrift server session code in separate module.

[LIVY-503] Separate thrift server session code in separate module.

This change creates a new module ("livy-thriftserver-session") with
the code related to the Thrift server that is expected to run inside
a Spark session. The module contains a few Job implementations that
are used by the Thrift server to run SQL statements on the session
side, plus some internal bookkeeping code.

The new module is a re-implementation of the current Scala code in
Java; Java is chosen because it becomes simpler to support different
Scala versions, for a couple of reasons:

- no need to compile the module separately for different Scala versions,
  for when that becomes an issue.
- better control of serialization, instead of relying on Scala types that
  may not be compatible across versions (imagine the Livy server and the
  Spark session using different Scala versions).

The change also removes the "jarLocation" configuration; in a release
package, the session jars would be available with the rest of the RSC
jars; for development, some code was added to the RSC to add the
thrift server jars to the session when a local build is detected.

Also included is a small change in the RSC pom so that the shaded jar
is generated in the correct target directory and later copied into the
staging directory; this solves an issue with using the RSC in test code
in other modules (such as the one being added).

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #118 from vanzin/LIVY-503.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/39fa887c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/39fa887c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/39fa887c

Branch: refs/heads/master
Commit: 39fa887cfa482c5edaba2ad0c2af683d8e5c63c6
Parents: ae2228f
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Nov 29 10:09:42 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Nov 29 10:09:42 2018 -0800

----------------------------------------------------------------------
 assembly/assembly.xml                           |   7 +
 assembly/pom.xml                                |   5 +
 pom.xml                                         |   4 +-
 rsc/pom.xml                                     |  23 +-
 .../org/apache/livy/rsc/ContextLauncher.java    |  20 +-
 .../main/scala/org/apache/livy/LivyConf.scala   |   1 -
 thriftserver/server/pom.xml                     |  10 +-
 .../LivyExecuteStatementOperation.scala         |  22 +-
 .../thriftserver/LivyThriftSessionManager.scala |  24 +-
 .../livy/thriftserver/rpc/RpcClient.scala       | 189 +---------
 .../livy/thriftserver/serde/ColumnBuffer.scala  | 186 ----------
 .../serde/ColumnOrientedResultSet.scala         |  35 --
 .../livy/thriftserver/types/DataType.scala      |  38 --
 .../livy/thriftserver/types/DataTypeUtils.scala | 145 ++------
 .../server/src/test/resources/log4j.properties  |  39 ++
 .../thriftserver/ThriftServerBaseTest.scala     |  11 +-
 thriftserver/session/pom.xml                    | 110 ++++++
 .../session/CleanupStatementJob.java            |  47 +++
 .../livy/thriftserver/session/ColumnBuffer.java | 361 +++++++++++++++++++
 .../livy/thriftserver/session/DataType.java     |  35 ++
 .../thriftserver/session/FetchResultJob.java    |  67 ++++
 .../session/FetchResultSchemaJob.java           |  46 +++
 .../session/RegisterSessionJob.java             |  45 +++
 .../livy/thriftserver/session/ResultSet.java    |  58 +++
 .../thriftserver/session/ScalaIterator.java     |  48 +++
 .../livy/thriftserver/session/SparkUtils.java   |  60 +++
 .../livy/thriftserver/session/SqlJob.java       |  92 +++++
 .../thriftserver/session/StatementState.java    |  42 +++
 .../session/ThriftSessionState.java             | 126 +++++++
 .../session/UnregisterSessionJob.java           |  48 +++
 .../thriftserver/session/ColumnBufferTest.java  | 256 +++++++++++++
 .../thriftserver/session/ThriftSessionTest.java | 169 +++++++++
 32 files changed, 1776 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/assembly/assembly.xml b/assembly/assembly.xml
index b4aa49c..83f3dfe 100644
--- a/assembly/assembly.xml
+++ b/assembly/assembly.xml
@@ -75,5 +75,12 @@
         <include>*</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <directory>${project.parent.basedir}/thriftserver/session/target/jars</directory>
+      <outputDirectory>${assembly.name}/rsc-jars</outputDirectory>
+      <includes>
+        <include>*</include>
+      </includes>
+    </fileSet>
   </fileSets>
 </assembly>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index fddef02..cd9fb2e 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -94,6 +94,11 @@
           <artifactId>livy-thriftserver</artifactId>
           <version>${project.version}</version>
         </dependency>
+        <dependency>
+          <groupId>${project.groupId}</groupId>
+          <artifactId>livy-thriftserver-session</artifactId>
+          <version>${project.version}</version>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e24827c..050bd3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1023,7 +1023,8 @@
       <id>thriftserver</id>
       <modules>
         <module>thriftserver/server</module>
-        <module>thriftserver/client</module> 
+        <module>thriftserver/session</module>
+        <module>thriftserver/client</module>
       </modules>
     </profile>
 
@@ -1054,7 +1055,6 @@
         <netty.spark-2.11.version>4.1.17.Final</netty.spark-2.11.version>
         <java.version>1.8</java.version>
         <py4j.version>0.10.7</py4j.version>
-        <json4s.version>3.2.11</json4s.version>
       </properties>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/rsc/pom.xml
----------------------------------------------------------------------
diff --git a/rsc/pom.xml b/rsc/pom.xml
index d42b082..a07e4bb 100644
--- a/rsc/pom.xml
+++ b/rsc/pom.xml
@@ -22,12 +22,11 @@
     <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
     <version>0.6.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.livy</groupId>
   <artifactId>livy-rsc</artifactId>
-  <version>0.6.0-incubating-SNAPSHOT</version>
-  <packaging>jar</packaging>
 
   <properties>
     <copyright.header>${asf.copyright.header}</copyright.header>
@@ -153,7 +152,6 @@
                   <shadedPattern>org.apache.livy.shaded.kryo</shadedPattern>
                 </relocation>
               </relocations>
-              <outputFile>${project.build.directory}/jars/${project.artifactId}-${project.version}.jar</outputFile>
             </configuration>
           </execution>
         </executions>
@@ -176,6 +174,25 @@
               <outputDirectory>${project.build.directory}/jars</outputDirectory>
             </configuration>
           </execution>
+          <execution>
+            <id>copy-rsc-jar</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.livy</groupId>
+                  <artifactId>livy-rsc</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${project.build.directory}/jars</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
index 790f912..c7c5bc4 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
@@ -160,14 +160,28 @@ class ContextLauncher {
       Utils.checkState(livyHome != null,
         "Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
       File rscJars = new File(livyHome, "rsc-jars");
+      List<File> allJars = new ArrayList<>();
       if (!rscJars.isDirectory()) {
         rscJars = new File(livyHome, "rsc/target/jars");
+
+        // To ease development, also add the thriftserver's session jars to the Spark app.
+        // On a release package, these jars should have been packaged in the proper "rsc-jars"
+        // directory.
+        File tsJars = new File(livyHome, "thriftserver/session/target/jars");
+        if (tsJars.isDirectory()) {
+          allJars.add(tsJars);
+        }
       }
+
       Utils.checkState(rscJars.isDirectory(),
-        "Cannot find 'client-jars' directory under LIVY_HOME.");
+        "Cannot find rsc jars directory under LIVY_HOME.");
+      allJars.add(rscJars);
+
       List<String> jars = new ArrayList<>();
-      for (File f : rscJars.listFiles()) {
-         jars.add(f.getAbsolutePath());
+      for (File dir : allJars) {
+        for (File f : dir.listFiles()) {
+           jars.add(f.getAbsolutePath());
+        }
       }
       livyJars = Utils.join(jars, ",");
     }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/server/src/main/scala/org/apache/livy/LivyConf.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 17030e9..63374f1 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -101,7 +101,6 @@ object LivyConf {
   val THRIFT_SERVER_ENABLED = Entry("livy.server.thrift.enabled", false)
   val THRIFT_INCR_COLLECT_ENABLED = Entry("livy.server.thrift.incrementalCollect", false)
   val THRIFT_SESSION_CREATION_TIMEOUT = Entry("livy.server.thrift.session.creationTimeout", "10m")
-  val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", null)
 
   /**
    * Recovery mode of Livy. Possible values:

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/pom.xml
----------------------------------------------------------------------
diff --git a/thriftserver/server/pom.xml b/thriftserver/server/pom.xml
index ef5870c..e1c26db 100644
--- a/thriftserver/server/pom.xml
+++ b/thriftserver/server/pom.xml
@@ -18,13 +18,15 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>livy-main</artifactId>
     <groupId>org.apache.livy</groupId>
     <version>0.6.0-incubating-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
+  <groupId>org.apache.livy</groupId>
   <artifactId>livy-thriftserver</artifactId>
 
   <dependencies>
@@ -80,6 +82,11 @@
 
     <dependency>
       <groupId>org.apache.livy</groupId>
+      <artifactId>livy-thriftserver-session</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-rsc</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
@@ -191,6 +198,7 @@
             <goals>
               <goal>wget</goal>
             </goals>
+            <phase>test-compile</phase>
             <configuration>
               <readTimeOut>60000</readTimeOut>
               <retries>5</retries>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
index 142eebf..c2c4716 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer
+import org.apache.hadoop.hive.serde2.thrift.{ColumnBuffer => ThriftColumnBuffer}
 import org.apache.hadoop.hive.shims.Utils
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.Operation
@@ -74,18 +74,17 @@ class LivyExecuteStatementOperation(
 
     // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
     val maxRows = maxRowsL.toInt
-    val jsonSchema = rpcClient.fetchResultSchema(statementId).get()
-    val types = getInternalTypes(jsonSchema)
-    val livyColumnResultSet = rpcClient.fetchResult(statementId, types, maxRows).get()
+    val resultSet = rpcClient.fetchResult(sessionHandle, statementId, maxRows).get()
 
-    val thriftColumns = livyColumnResultSet.columns.map { col =>
-      new ColumnBuffer(toHiveThriftType(col.dataType), col.getNulls, col.getColumnValues)
+    val thriftColumns = resultSet.getColumns().map { col =>
+      new ThriftColumnBuffer(toHiveThriftType(col.getType()), col.getNulls(), col.getValues())
     }
-    val result = new ColumnBasedSet(tableSchemaFromSparkJson(jsonSchema).toTypeDescriptors,
+    val result = new ColumnBasedSet(
+      toHiveTableSchema(resultSet.getSchema()).toTypeDescriptors,
       thriftColumns.toList.asJava,
       rowOffset)
-    livyColumnResultSet.columns.headOption.foreach { c =>
-      rowOffset += c.size
+    if (resultSet.getColumns() != null && resultSet.getColumns().length > 0) {
+      rowOffset += resultSet.getColumns()(0).size()
     }
     result
   }
@@ -173,7 +172,8 @@ class LivyExecuteStatementOperation(
   }
 
   def getResultSetSchema: TableSchema = {
-    val tableSchema = tableSchemaFromSparkJson(rpcClient.fetchResultSchema(statementId).get())
+    val tableSchema = toHiveTableSchema(
+      rpcClient.fetchResultSchema(sessionHandle, statementId).get())
     // Workaround for operations returning an empty schema (eg. CREATE, INSERT, ...)
     if (tableSchema.getSize == 0) {
       tableSchema.addStringColumn("Result", "")
@@ -183,7 +183,7 @@ class LivyExecuteStatementOperation(
 
   private def cleanup(state: OperationState) {
     if (statementId != null && rpcClientValid) {
-      rpcClient.cleanupStatement(statementId).get()
+      rpcClient.cleanupStatement(sessionHandle, statementId).get()
     }
     setState(state)
   }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 2dca1d4..344a990 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -18,7 +18,6 @@
 package org.apache.livy.thriftserver
 
 import java.lang.reflect.UndeclaredThrowableException
-import java.net.URI
 import java.security.PrivilegedExceptionAction
 import java.util
 import java.util.{Date, Map => JMap, UUID}
@@ -28,8 +27,8 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.Duration
 import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
 import scala.util.{Failure, Success, Try}
 
 import org.apache.hadoop.hive.conf.HiveConf
@@ -203,19 +202,6 @@ class LivyThriftSessionManager(val server: LivyThriftServer, hiveConf: HiveConf)
       sessionHandle: SessionHandle,
       livySession: InteractiveSession,
       initStatements: List[String]): Unit = {
-    // Add the thriftserver jar to Spark application as we need to deserialize there the classes
-    // which handle the job submission.
-    // Note: if this is an already existing session, adding the JARs multiple times is not a
-    // problem as Spark ignores JARs which have already been added.
-    try {
-      livySession.addJar(LivyThriftSessionManager.thriftserverJarLocation(server.livyConf))
-    } catch {
-      case e: java.util.concurrent.ExecutionException
-          if Option(e.getCause).forall(_.getMessage.contains("has already been uploaded")) =>
-        // We have already uploaded the jar to this session, we can ignore this error
-        debug(e.getMessage, e)
-    }
-
     val rpcClient = new RpcClient(livySession)
     rpcClient.executeRegisterSession(sessionHandle).get()
     initStatements.foreach { statement =>
@@ -223,7 +209,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, hiveConf: HiveConf)
       try {
         rpcClient.executeSql(sessionHandle, statementId, statement).get()
       } finally {
-        Try(rpcClient.cleanupStatement(statementId).get()).failed.foreach { e =>
+        Try(rpcClient.cleanupStatement(sessionHandle, statementId).get()).failed.foreach { e =>
           error(s"Failed to close init operation $statementId", e)
         }
       }
@@ -556,12 +542,6 @@ object LivyThriftSessionManager extends Logging {
   private val livySessionIdConfigKey = "set:hiveconf:livy.server.sessionId"
   private val livySessionConfRegexp = "set:hiveconf:livy.session.conf.(.*)".r
   private val hiveVarPattern = "set:hivevar:(.*)".r
-  private val JAR_LOCATION = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
-
-  def thriftserverJarLocation(livyConf: LivyConf): URI = {
-    Option(livyConf.get(LivyConf.THRIFT_SERVER_JAR_LOCATION)).map(new URI(_))
-      .getOrElse(JAR_LOCATION)
-  }
 
   private def convertConfValueToInt(key: String, value: String) = {
     val res = Try(value.toInt)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
index 605a810..beba6a9 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
@@ -17,24 +17,13 @@
 
 package org.apache.livy.thriftserver.rpc
 
-import java.lang.reflect.InvocationTargetException
-
-import scala.collection.immutable.HashMap
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
-
 import org.apache.hive.service.cli.SessionHandle
-import org.apache.spark.sql.{Row, SparkSession}
 
 import org.apache.livy._
 import org.apache.livy.server.interactive.InteractiveSession
-import org.apache.livy.thriftserver.serde.ColumnOrientedResultSet
-import org.apache.livy.thriftserver.types.DataType
-import org.apache.livy.utils.LivySparkUtils
+import org.apache.livy.thriftserver.session._
 
 class RpcClient(livySession: InteractiveSession) extends Logging {
-  import RpcClient._
-
   private val defaultIncrementalCollect =
     livySession.livyConf.getBoolean(LivyConf.THRIFT_INCR_COLLECT_ENABLED).toString
 
@@ -56,7 +45,8 @@ class RpcClient(livySession: InteractiveSession) extends Logging {
     require(null != statementId, s"Invalid statementId specified. StatementId = $statementId")
     require(null != statement, s"Invalid statement specified. StatementId = $statement")
     livySession.recordActivity()
-    rscClient.submit(executeSqlJob(sessionId(sessionHandle),
+    rscClient.submit(new SqlJob(
+      sessionId(sessionHandle),
       statementId,
       statement,
       defaultIncrementalCollect,
@@ -64,29 +54,33 @@ class RpcClient(livySession: InteractiveSession) extends Logging {
   }
 
   @throws[Exception]
-  def fetchResult(statementId: String,
-      types: Array[DataType],
-      maxRows: Int): JobHandle[ColumnOrientedResultSet] = {
+  def fetchResult(
+      sessionHandle: SessionHandle,
+      statementId: String,
+      maxRows: Int): JobHandle[ResultSet] = {
     info(s"RSC client is fetching result for statementId $statementId with $maxRows maxRows.")
     require(null != statementId, s"Invalid statementId specified. StatementId = $statementId")
     livySession.recordActivity()
-    rscClient.submit(fetchResultJob(statementId, types, maxRows))
+    rscClient.submit(new FetchResultJob(sessionId(sessionHandle), statementId, maxRows))
   }
 
   @throws[Exception]
-  def fetchResultSchema(statementId: String): JobHandle[String] = {
+  def fetchResultSchema(sessionHandle: SessionHandle, statementId: String): JobHandle[String] = {
     info(s"RSC client is fetching result schema for statementId = $statementId")
     require(null != statementId, s"Invalid statementId specified. statementId = $statementId")
     livySession.recordActivity()
-    rscClient.submit(fetchResultSchemaJob(statementId))
+    rscClient.submit(new FetchResultSchemaJob(sessionId(sessionHandle), statementId))
   }
 
   @throws[Exception]
-  def cleanupStatement(statementId: String, cancelJob: Boolean = false): JobHandle[_] = {
+  def cleanupStatement(
+      sessionHandle: SessionHandle,
+      statementId: String,
+      cancelJob: Boolean = false): JobHandle[_] = {
     info(s"Cleaning up remote session for statementId = $statementId")
     require(null != statementId, s"Invalid statementId specified. statementId = $statementId")
     livySession.recordActivity()
-    rscClient.submit(cleanupStatementJob(statementId))
+    rscClient.submit(new CleanupStatementJob(sessionId(sessionHandle), statementId))
   }
 
   /**
@@ -99,7 +93,7 @@ class RpcClient(livySession: InteractiveSession) extends Logging {
   def executeRegisterSession(sessionHandle: SessionHandle): JobHandle[_] = {
     info(s"RSC client is executing register session $sessionHandle")
     livySession.recordActivity()
-    rscClient.submit(registerSessionJob(sessionId(sessionHandle)))
+    rscClient.submit(new RegisterSessionJob(sessionId(sessionHandle)))
   }
 
   /**
@@ -109,155 +103,6 @@ class RpcClient(livySession: InteractiveSession) extends Logging {
   def executeUnregisterSession(sessionHandle: SessionHandle): JobHandle[_] = {
     info(s"RSC client is executing unregister session $sessionHandle")
     livySession.recordActivity()
-    rscClient.submit(unregisterSessionJob(sessionId(sessionHandle)))
-  }
-}
-
-/**
- * As remotely we don't have any class instance, all the job definitions are placed here in
- * order to enforce that we are not accessing any class attribute
- */
-object RpcClient {
-  // Maps a session ID to its SparkSession.
-  val SESSION_SPARK_ENTRY_MAP = "livy.thriftserver.rpc_sessionIdToSparkSQLSession"
-  val STATEMENT_RESULT_ITER_MAP = "livy.thriftserver.rpc_statementIdToResultIter"
-  val STATEMENT_SCHEMA_MAP = "livy.thriftserver.rpc_statementIdToSchema"
-
-  private def registerSessionJob(sessionId: String): Job[_] = new Job[Boolean] {
-    override def call(jc: JobContext): Boolean = {
-      val spark = jc.sparkSession[SparkSession]()
-      val sessionSpecificSpark = spark.newSession()
-      jc.sc().synchronized {
-        val existingMap =
-          Try(jc.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP))
-            .getOrElse(new HashMap[String, AnyRef]())
-        jc.setSharedObject(SESSION_SPARK_ENTRY_MAP,
-          existingMap + ((sessionId, sessionSpecificSpark)))
-        Try(jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP))
-          .failed.foreach { _ =>
-          jc.setSharedObject(STATEMENT_SCHEMA_MAP, new HashMap[String, String]())
-        }
-        Try(jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP))
-          .failed.foreach { _ =>
-          jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, new HashMap[String, Iterator[Row]]())
-        }
-      }
-      true
-    }
-  }
-
-  private def unregisterSessionJob(sessionId: String): Job[_] = new Job[Boolean] {
-    override def call(jobContext: JobContext): Boolean = {
-      jobContext.sc().synchronized {
-        val existingMap =
-          jobContext.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP)
-        jobContext.setSharedObject(SESSION_SPARK_ENTRY_MAP, existingMap - sessionId)
-      }
-      true
-    }
-  }
-
-  private def cleanupStatementJob(statementId: String): Job[_] = new Job[Boolean] {
-    override def call(jc: JobContext): Boolean = {
-      val sparkContext = jc.sc()
-      sparkContext.cancelJobGroup(statementId)
-      sparkContext.synchronized {
-        // Clear job group only if current job group is same as expected job group.
-        if (sparkContext.getLocalProperty("spark.jobGroup.id") == statementId) {
-          sparkContext.clearJobGroup()
-        }
-        val iterMap = jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP)
-        jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, iterMap - statementId)
-        val schemaMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)
-        jc.setSharedObject(STATEMENT_SCHEMA_MAP, schemaMap - statementId)
-      }
-      true
-    }
-  }
-
-  private def fetchResultSchemaJob(statementId: String): Job[String] = new Job[String] {
-    override def call(jobContext: JobContext): String = {
-      jobContext.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)(statementId)
-    }
-  }
-
-  private def fetchResultJob(statementId: String,
-      types: Array[DataType],
-      maxRows: Int): Job[ColumnOrientedResultSet] = new Job[ColumnOrientedResultSet] {
-    override def call(jobContext: JobContext): ColumnOrientedResultSet = {
-      val statementIterMap =
-        jobContext.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP)
-      val iter = statementIterMap(statementId)
-
-      if (null == iter) {
-        // Previous query execution failed.
-        throw new NoSuchElementException("No successful query executed for output")
-      }
-
-      val resultSet = new ColumnOrientedResultSet(types)
-      val numOfColumns = types.length
-      if (!iter.hasNext) {
-        resultSet
-      } else {
-        var curRow = 0
-        while (curRow < maxRows && iter.hasNext) {
-          val sparkRow = iter.next()
-          val row = ArrayBuffer[Object]()
-          var curCol: Integer = 0
-          while (curCol < numOfColumns) {
-            row += sparkRow.get(curCol).asInstanceOf[Object]
-            curCol += 1
-          }
-          resultSet.addRow(row.toArray)
-          curRow += 1
-        }
-        resultSet
-      }
-    }
-  }
-
-  private def executeSqlJob(sessionId: String,
-      statementId: String,
-      statement: String,
-      defaultIncrementalCollect: String,
-      incrementalCollectEnabledProp: String): Job[_] = new Job[Boolean] {
-    override def call(jc: JobContext): Boolean = {
-      val sparkContext = jc.sc()
-      sparkContext.synchronized {
-        sparkContext.setJobGroup(statementId, statement)
-      }
-      val spark =
-        jc.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP)(sessionId)
-      try {
-        val result = spark.sql(statement)
-        val jsonSchema = result.schema.json
-
-        // Set the schema in the shared map
-        sparkContext.synchronized {
-          val existingMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)
-          jc.setSharedObject(STATEMENT_SCHEMA_MAP, existingMap + ((statementId, jsonSchema)))
-        }
-
-        val incrementalCollect = spark.conf.get(incrementalCollectEnabledProp,
-          defaultIncrementalCollect).toBoolean
-
-        val iter = if (incrementalCollect) {
-          result.rdd.toLocalIterator
-        } else {
-          result.collect().iterator
-        }
-
-        // Set the iterator in the shared map
-        sparkContext.synchronized {
-          val existingMap =
-            jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP)
-          jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, existingMap + ((statementId, iter)))
-        }
-      } catch {
-        case e: InvocationTargetException => throw e.getCause
-      }
-
-      true
-    }
+    rscClient.submit(new UnregisterSessionJob(sessionId(sessionHandle)))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnBuffer.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnBuffer.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnBuffer.scala
deleted file mode 100644
index 248a77d..0000000
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnBuffer.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.thriftserver.serde
-
-import java.nio.ByteBuffer
-import java.util
-
-import scala.collection.mutable
-
-import org.apache.livy.thriftserver.types.{DataType, DataTypeUtils}
-
-object ColumnBuffer {
-  private val DEFAULT_SIZE = 100
-  private val EMPTY_BINARY = ByteBuffer.allocate(0)
-  private val EMPTY_STRING = ""
-  private val HANDLED_TYPES =
-    Set("boolean", "byte", "short", "integer", "long", "float", "double", "binary")
-}
-
-class ColumnBuffer(val dataType: DataType) {
-  private val nulls = new mutable.BitSet()
-  private var currentSize = 0
-  private var boolVars: Array[Boolean] = _
-  private var byteVars: Array[Byte] = _
-  private var shortVars: Array[Short] = _
-  private var intVars: Array[Int] = _
-  private var longVars: Array[Long] = _
-  private var doubleVars: Array[Double] = _
-  private var stringVars: util.List[String] = _
-  private var binaryVars: util.List[ByteBuffer] = _
-
-  dataType.name match {
-    case "boolean" =>
-      boolVars = new Array[Boolean](ColumnBuffer.DEFAULT_SIZE)
-    case "byte" =>
-      byteVars = new Array[Byte](ColumnBuffer.DEFAULT_SIZE)
-    case "short" =>
-      shortVars = new Array[Short](ColumnBuffer.DEFAULT_SIZE)
-    case "integer" =>
-      intVars = new Array[Int](ColumnBuffer.DEFAULT_SIZE)
-    case "long" =>
-      longVars = new Array[Long](ColumnBuffer.DEFAULT_SIZE)
-    case "float" | "double" =>
-      doubleVars = new Array[Double](ColumnBuffer.DEFAULT_SIZE)
-    case "binary" =>
-      binaryVars = new util.ArrayList[ByteBuffer]
-    case "void" => // all NULLs, nothing to do
-    case _ =>
-      stringVars = new util.ArrayList[String]
-  }
-
-  def get(index: Int): Any = {
-    if (this.nulls(index)) {
-      null
-    } else {
-      dataType.name match {
-        case "boolean" =>
-          boolVars(index)
-        case "byte" =>
-          byteVars(index)
-        case "short" =>
-          shortVars(index)
-        case "integer" =>
-          intVars(index)
-        case "long" =>
-          longVars(index)
-        case "float" | "double" =>
-          doubleVars(index)
-        case "binary" =>
-          binaryVars.get(index).array()
-        case _ =>
-          stringVars.get(index)
-      }
-    }
-  }
-
-  def size: Int = currentSize
-
-  def addValue(field: Any): Unit = {
-    if (field == null) {
-      nulls += currentSize
-      if (!ColumnBuffer.HANDLED_TYPES.contains(dataType.name)) {
-        stringVars.add(ColumnBuffer.EMPTY_STRING)
-      } else if (dataType.name == "binary") {
-        binaryVars.add(ColumnBuffer.EMPTY_BINARY)
-      }
-    } else {
-      dataType.name match {
-        case "boolean" =>
-          ensureBoolVarsSize()
-          boolVars(currentSize) = field.asInstanceOf[Boolean]
-        case "byte" =>
-          ensureByteVarsSize()
-          byteVars(currentSize) = field.asInstanceOf[Byte]
-        case "short" =>
-          ensureShortVarsSize()
-          shortVars(currentSize) = field.asInstanceOf[Short]
-        case "integer" =>
-          ensureIntVarsSize()
-          intVars(currentSize) = field.asInstanceOf[Int]
-        case "long" =>
-          ensureLongVarsSize()
-          longVars(currentSize) = field.asInstanceOf[Long]
-        case "float" =>
-          ensureDoubleVarsSize()
-          // We need to convert the float to string and then to double in order to avoid precision
-          // issues caused by the poor precision of Float
-          doubleVars(currentSize) = field.toString.toDouble
-        case "double" =>
-          ensureDoubleVarsSize()
-          doubleVars(currentSize) = field.asInstanceOf[Double]
-        case "binary" =>
-          binaryVars.add(ByteBuffer.wrap(field.asInstanceOf[Array[Byte]]))
-        case _ =>
-          stringVars.add(DataTypeUtils.toHiveString(field, dataType))
-      }
-    }
-
-    currentSize += 1
-  }
-
-  private def ensureBoolVarsSize(): Unit = if (boolVars.length == currentSize) {
-    val newVars = new Array[Boolean](currentSize << 1)
-    System.arraycopy(boolVars, 0, newVars, 0, currentSize)
-    boolVars = newVars
-  }
-
-  private def ensureByteVarsSize(): Unit = if (byteVars.length == currentSize) {
-    val newVars = new Array[Byte](currentSize << 1)
-    System.arraycopy(byteVars, 0, newVars, 0, currentSize)
-    byteVars = newVars
-  }
-
-  private def ensureShortVarsSize(): Unit = if (shortVars.length == currentSize) {
-    val newVars = new Array[Short](currentSize << 1)
-    System.arraycopy(shortVars, 0, newVars, 0, currentSize)
-    shortVars = newVars
-  }
-
-  private def ensureIntVarsSize(): Unit = if (intVars.length == currentSize) {
-    val newVars = new Array[Int](currentSize << 1)
-    System.arraycopy(intVars, 0, newVars, 0, currentSize)
-    intVars = newVars
-  }
-
-  private def ensureLongVarsSize(): Unit = if (longVars.length == currentSize) {
-    val newVars = new Array[Long](currentSize << 1)
-    System.arraycopy(longVars, 0, newVars, 0, currentSize)
-    longVars = newVars
-  }
-
-  private def ensureDoubleVarsSize(): Unit = if (doubleVars.length == currentSize) {
-    val newVars = new Array[Double](currentSize << 1)
-    System.arraycopy(doubleVars, 0, newVars, 0, currentSize)
-    doubleVars = newVars
-  }
-
-  private[thriftserver] def getColumnValues: Any = dataType.name match {
-    case "boolean" => boolVars.take(currentSize)
-    case "byte" => byteVars.take(currentSize)
-    case "short" => shortVars.take(currentSize)
-    case "integer" => intVars.take(currentSize)
-    case "long" => longVars.take(currentSize)
-    case "float" => doubleVars.take(currentSize)
-    case "double" => doubleVars.take(currentSize)
-    case "binary" => binaryVars
-    case _ => stringVars
-  }
-
-  private[thriftserver] def getNulls: util.BitSet = util.BitSet.valueOf(nulls.toBitMask)
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnOrientedResultSet.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnOrientedResultSet.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnOrientedResultSet.scala
deleted file mode 100644
index 64b6e4e..0000000
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ColumnOrientedResultSet.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.thriftserver.serde
-
-import org.apache.livy.thriftserver.types.DataType
-
-/**
- * Utility class for (de-)serialize the results from the Spark application and Livy thriftserver.
- */
-class ColumnOrientedResultSet(val types: Array[DataType]) {
-  val columns: Array[ColumnBuffer] = types.map(new ColumnBuffer(_))
-  def addRow(fields: Array[AnyRef]): Unit = {
-    var i = 0
-    while (i < fields.length) {
-      val field = fields(i)
-      columns(i).addValue(field)
-      i += 1
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataType.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataType.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataType.scala
deleted file mode 100644
index c942b46..0000000
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataType.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.thriftserver.types
-
-private[thriftserver] trait DataType {
-  def name: String
-}
-
-private[thriftserver] case class BasicDataType(name: String) extends DataType
-
-private[thriftserver] case class StructField(name: String, dataType: DataType)
-
-private[thriftserver]case class StructType(fields: Array[StructField]) extends DataType {
-  val name = "struct"
-}
-
-private[thriftserver] case class ArrayType(elementsType: DataType) extends DataType {
-  val name = "array"
-}
-
-private[thriftserver] case class MapType(keyType: DataType, valueType: DataType) extends DataType {
-  val name = "map"
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
index 1644fdb..f61dd30 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
@@ -26,28 +26,53 @@ import org.json4s.{DefaultFormats, JValue}
 import org.json4s.JsonAST.{JObject, JString}
 import org.json4s.jackson.JsonMethods.parse
 
-import org.apache.livy.Logging
+import org.apache.livy.thriftserver.session.DataType
 
 /**
  * Utility class for converting and representing Spark and Hive data types.
  */
-object DataTypeUtils extends Logging {
+object DataTypeUtils {
   // Used for JSON conversion
   private implicit val formats = DefaultFormats
 
-  private def toHive(jValue: JValue): String = {
-    jValue match {
-      case JString(t) => primitiveToHive(t)
-      case o: JObject => complexToHive(o)
-      case _ => throw new IllegalArgumentException(
-        s"Spark type was neither a string nor a object. It was: $jValue.")
+  /**
+   * Returns the Hive [[Type]] used in the thrift communications for the given Livy type.
+   */
+  def toHiveThriftType(ltype: DataType): Type = {
+    ltype match {
+      case DataType.BOOLEAN => Type.BOOLEAN_TYPE
+      case DataType.BYTE => Type.TINYINT_TYPE
+      case DataType.SHORT => Type.SMALLINT_TYPE
+      case DataType.INTEGER => Type.INT_TYPE
+      case DataType.LONG => Type.BIGINT_TYPE
+      case DataType.FLOAT => Type.FLOAT_TYPE
+      case DataType.DOUBLE => Type.DOUBLE_TYPE
+      case DataType.BINARY => Type.BINARY_TYPE
+      case _ => Type.STRING_TYPE
+    }
+  }
+
+  /**
+   * Converts a JSON representing the Spark schema (the one returned by `df.schema.json`) into
+   * a Hive [[TableSchema]] instance.
+   *
+   * @param sparkJson a [[String]] containing the JSON representation of a Spark Dataframe schema
+   * @return a [[TableSchema]] representing the schema provided as input
+   */
+  def toHiveTableSchema(sparkJson: String): TableSchema = {
+    val schema = parse(sparkJson) \ "fields"
+    val fields = schema.children.map { field =>
+      val name = (field \ "name").extract[String]
+      val hiveType = toHive(field \ "type")
+      new FieldSchema(name, hiveType, "")
     }
+    new TableSchema(fields.asJava)
   }
 
-  private def getInternalType(jValue: JValue): DataType = {
+  private def toHive(jValue: JValue): String = {
     jValue match {
-      case JString(t) => BasicDataType(t)
-      case o: JObject => complexToInternal(o)
+      case JString(t) => primitiveToHive(t)
+      case o: JObject => complexToHive(o)
       case _ => throw new IllegalArgumentException(
         s"Spark type was neither a string nor a object. It was: $jValue.")
     }
@@ -77,102 +102,4 @@ object DataTypeUtils extends Logging {
       case "udt" => toHive(sparkType \ "sqlType")
     }
   }
-
-  private def complexToInternal(sparkType: JObject): DataType = {
-    (sparkType \ "type").extract[String] match {
-      case "array" => ArrayType(getInternalType(sparkType \ "elementType"))
-      case "struct" =>
-        val fields = (sparkType \ "fields").children.map { f =>
-          StructField((f \ "name").extract[String], getInternalType(f \ "type"))
-        }
-        StructType(fields.toArray)
-      case "map" =>
-        MapType(getInternalType(sparkType \ "keyType"), getInternalType(sparkType \ "valueType"))
-      case "udt" => getInternalType(sparkType \ "sqlType")
-    }
-  }
-
-  /**
-   * Converts a JSON representing the Spark schema (the one returned by `df.schema.json`) into
-   * a Hive [[TableSchema]] instance.
-   *
-   * @param sparkJson a [[String]] containing the JSON representation of a Spark Dataframe schema
-   * @return a [[TableSchema]] representing the schema provided as input
-   */
-  def tableSchemaFromSparkJson(sparkJson: String): TableSchema = {
-    val schema = parse(sparkJson) \ "fields"
-    val fields = schema.children.map { field =>
-      val name = (field \ "name").extract[String]
-      val hiveType = toHive(field \ "type")
-      new FieldSchema(name, hiveType, "")
-    }
-    new TableSchema(fields.asJava)
-  }
-
-  /**
-   * Extracts the main type of each column contained in the JSON. This means that complex types
-   * are not returned in their full representation with the nested types: eg. for an array of any
-   * kind of data it returns `"array"`.
-   *
-   * @param sparkJson a [[String]] containing the JSON representation of a Spark Dataframe schema
-   * @return an [[Array]] of the principal type of the columns is the schema.
-   */
-  def getInternalTypes(sparkJson: String): Array[DataType] = {
-    val schema = parse(sparkJson) \ "fields"
-    schema.children.map { field =>
-      getInternalType(field \ "type")
-    }.toArray
-  }
-
-  /**
-   * Returns the Hive [[Type]] used in the thrift communications for {@param thriftDt}.
-   */
-  def toHiveThriftType(thriftDt: DataType): Type = {
-    thriftDt.name match {
-      case "boolean" => Type.BOOLEAN_TYPE
-      case "byte" => Type.TINYINT_TYPE
-      case "short" => Type.SMALLINT_TYPE
-      case "integer" => Type.INT_TYPE
-      case "long" => Type.BIGINT_TYPE
-      case "float" => Type.FLOAT_TYPE
-      case "double" => Type.DOUBLE_TYPE
-      case "binary" => Type.BINARY_TYPE
-      case _ => Type.STRING_TYPE
-    }
-  }
-
-  def toHiveString(value: Any, dt: DataType): String = (value, dt) match {
-    case (null, _) => "NULL"
-    case (struct: Any, StructType(fields)) =>
-      val values = struct.getClass.getMethod("toSeq").invoke(struct).asInstanceOf[Seq[Any]]
-      values.zip(fields).map {
-        case (v, t) => s""""${t.name}":${toHiveComplexTypeFieldString((v, t.dataType))}"""
-      }.mkString("{", ",", "}")
-    case (seq: Seq[_], ArrayType(t)) =>
-      seq.map(v => (v, t)).map(toHiveComplexTypeFieldString).mkString("[", ",", "]")
-    case (map: Map[_, _], MapType(kType, vType)) =>
-      map.map { case (k, v) =>
-        s"${toHiveComplexTypeFieldString((k, kType))}:${toHiveComplexTypeFieldString((v, vType))}"
-      }.toSeq.sorted.mkString("{", ",", "}")
-    case (decimal: java.math.BigDecimal, t) if t.name.startsWith("decimal") =>
-      decimal.stripTrailingZeros.toString
-    case (other, _) => other.toString
-  }
-
-  def toHiveComplexTypeFieldString(a: (Any, DataType)): String = a match {
-    case (null, _) => "null"
-    case (struct: Any, StructType(fields)) =>
-      val values = struct.getClass.getMethod("toSeq").invoke(struct).asInstanceOf[Seq[Any]]
-      values.zip(fields).map {
-        case (v, t) => s""""${t.name}":${toHiveComplexTypeFieldString((v, t.dataType))}"""
-      }.mkString("{", ",", "}")
-    case (seq: Seq[_], ArrayType(t)) =>
-      seq.map(v => (v, t)).map(toHiveComplexTypeFieldString).mkString("[", ",", "]")
-    case (map: Map[_, _], MapType(kType, vType)) =>
-      map.map { case (k, v) =>
-        s"${toHiveComplexTypeFieldString((k, kType))}:${toHiveComplexTypeFieldString((v, vType))}"
-      }.toSeq.sorted.mkString("{", ",", "}")
-    case (s: String, t) if t.name == "string" => s""""$s""""
-    case (other, _) => other.toString
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/test/resources/log4j.properties b/thriftserver/server/src/test/resources/log4j.properties
new file mode 100644
index 0000000..72bab70
--- /dev/null
+++ b/thriftserver/server/src/test/resources/log4j.properties
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+test.appender=file
+log4j.rootCategory=WARN, ${test.appender}
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Tests that launch java subprocesses can set the "test.appender" system property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t: %m%n
+
+# Enable DEBUG logs for Livy classes (with some exceptions).
+log4j.logger.org.apache.livy=DEBUG
+log4j.logger.org.apache.livy.rsc=INFO
+log4j.logger.org.apache.livy.shaded=INFO
+log4j.logger.org.apache.livy.thriftserver.LivyCLIService=INFO

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
----------------------------------------------------------------------
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
index d1ffd12..f1fb247 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
@@ -17,7 +17,6 @@
 
 package org.apache.livy.thriftserver
 
-import java.io.File
 import java.sql.{Connection, DriverManager, Statement}
 
 import org.apache.hadoop.hive.conf.HiveConf
@@ -25,13 +24,12 @@ import org.apache.hive.jdbc.HiveDriver
 import org.apache.hive.service.Service.STATE
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
-import org.apache.livy.LIVY_VERSION
 import org.apache.livy.LivyConf
 import org.apache.livy.LivyConf.{LIVY_SPARK_SCALA_VERSION, LIVY_SPARK_VERSION}
 import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.{SessionStore, StateStore}
 import org.apache.livy.sessions.InteractiveSessionManager
-import org.apache.livy.utils.LivySparkUtils.{formatSparkVersion, sparkScalaVersion, sparkSubmitVersion, testSparkVersion}
+import org.apache.livy.utils.LivySparkUtils.{formatSparkVersion, sparkScalaVersion, sparkSubmitVersion}
 
 object ServerMode extends Enumeration {
   val binary, http = Value
@@ -65,13 +63,6 @@ abstract class ThriftServerBaseTest extends FunSuite with BeforeAndAfterAll {
       s"livy.${HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT}"
     }
     livyConf.set(portConfKey, port.toString)
-    val home = sys.env("LIVY_HOME")
-    val thriftserverJarName = s"livy-thriftserver-${LIVY_VERSION}.jar"
-    val thriftserverJarFile = Option(new File(home, s"jars/$thriftserverJarName"))
-      .filter(_.exists())
-      .getOrElse(new File(home, s"thriftserver/server/target/jars/$thriftserverJarName"))
-    livyConf.set(LivyConf.THRIFT_SERVER_JAR_LOCATION, thriftserverJarFile.getAbsolutePath)
-    livyConf.set(LivyConf.LOCAL_FS_WHITELIST, thriftserverJarFile.getParent)
 
     // Set formatted Spark and Scala version into livy configuration, this will be used by
     // session creation.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/pom.xml
----------------------------------------------------------------------
diff --git a/thriftserver/session/pom.xml b/thriftserver/session/pom.xml
new file mode 100644
index 0000000..a817abe
--- /dev/null
+++ b/thriftserver/session/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.livy</groupId>
+    <artifactId>livy-main</artifactId>
+    <version>0.6.0-incubating-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.livy</groupId>
+  <artifactId>livy-thriftserver-session</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>livy-api</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>livy-rsc</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>livy-thriftserver-session</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${project.build.directory}/jars</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <!--
+      Override the json4s version to match Spark 2.4's. This module doesn't use json4s, but the
+      Spark APIs called in the tests require a different version of json4s than Livy's (and Livy
+      doesn't really work with Spark's version yet).
+    -->
+    <profile>
+      <id>spark-2.4</id>
+      <properties>
+        <json4s.version>3.5.3</json4s.version>
+      </properties>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java
new file mode 100644
index 0000000..b8a0ee8
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+/**
+ * Job used to clean up state held for a statement.
+ */
+public class CleanupStatementJob implements Job<Void> {
+
+  private final String sessionId;
+  private final String statementId;
+
+  public CleanupStatementJob() {
+    this(null, null);
+  }
+
+  public CleanupStatementJob(String sessionId, String statementId) {
+    this.sessionId = sessionId;
+    this.statementId = statementId;
+  }
+
+  @Override
+  public Void call(JobContext ctx) {
+    ThriftSessionState session = ThriftSessionState.get(ctx, sessionId);
+    session.cleanupStatement(statementId);
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
new file mode 100644
index 0000000..585d581
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.thriftserver.session;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import scala.Tuple2;
+import scala.collection.Map;
+import scala.collection.Seq;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+
+/**
+ * Container for the contents of a single column in a result set.
+ */
+public class ColumnBuffer {
+
+  public static final int DEFAULT_SIZE = 100;
+  private static final String EMPTY_STRING = "";
+  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]);
+
+  private final DataType type;
+
+  /**
+   * This is a hack around the fact that Kryo cannot properly serialize an instance of
+   * java.util.BitSet, because the data are stored in transient fields. So we manually
+   * implement a bit set instead.
+   */
+  private byte[] nulls;
+
+  private int currentSize;
+  private boolean[] bools;
+  private byte[] bytes;
+  private short[] shorts;
+  private int[] ints;
+  private long[] longs;
+  private float[] floats;
+  private double[] doubles;
+  private String[] strings;
+  private byte[][] buffers;
+
+  public ColumnBuffer() {
+    this.type = null;
+  }
+
+  public ColumnBuffer(DataType type) {
+    this.type = type;
+
+    switch (type) {
+    case BOOLEAN:
+      bools = new boolean[DEFAULT_SIZE];
+      break;
+    case BYTE:
+      bytes = new byte[DEFAULT_SIZE];
+      break;
+    case SHORT:
+      shorts = new short[DEFAULT_SIZE];
+      break;
+    case INTEGER:
+      ints = new int[DEFAULT_SIZE];
+      break;
+    case LONG:
+      longs = new long[DEFAULT_SIZE];
+      break;
+    case FLOAT:
+      floats = new float[DEFAULT_SIZE];
+      break;
+    case DOUBLE:
+      doubles = new double[DEFAULT_SIZE];
+      break;
+    case BINARY:
+      buffers = new byte[DEFAULT_SIZE][];
+      break;
+    case STRING:
+      strings = new String[DEFAULT_SIZE];
+      break;
+    }
+  }
+
+  public DataType getType() {
+    return type;
+  }
+
+  public Object get(int index) {
+    if (index >= currentSize) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+
+    if (isNull(index)) {
+      return null;
+    }
+
+    switch (type) {
+    case BOOLEAN:
+      return bools[index];
+    case BYTE:
+      return bytes[index];
+    case SHORT:
+      return shorts[index];
+    case INTEGER:
+      return ints[index];
+    case LONG:
+      return longs[index];
+    case FLOAT:
+      return floats[index];
+    case DOUBLE:
+      return doubles[index];
+    case BINARY:
+      return ByteBuffer.wrap(buffers[index]);
+    case STRING:
+      return strings[index];
+    }
+
+    throw new IllegalStateException("ShouldNotReachHere()");
+  }
+
+  public int size() {
+    return currentSize;
+  }
+
+  public void add(Object value) {
+    if (value == null) {
+      setNull(currentSize);
+      currentSize++;
+      return;
+    }
+
+    ensureCapacity();
+
+    switch (type) {
+    case BOOLEAN:
+      bools[currentSize] = (boolean) value;
+      break;
+    case BYTE:
+      bytes[currentSize] = (byte) value;
+      break;
+    case SHORT:
+      shorts[currentSize] = (short) value;
+      break;
+    case INTEGER:
+      ints[currentSize] = (int) value;
+      break;
+    case LONG:
+      longs[currentSize] = (long) value;
+      break;
+    case FLOAT:
+      floats[currentSize] = (float) value;
+      break;
+    case DOUBLE:
+      doubles[currentSize] = (double) value;
+      break;
+    case BINARY:
+      buffers[currentSize] = (byte[]) value;
+      break;
+    case STRING:
+      strings[currentSize] = toHiveString(value, false);
+      break;
+    }
+
+    currentSize += 1;
+  }
+
+  public Object getValues() {
+    switch (type) {
+    case BOOLEAN:
+      return (bools.length != currentSize) ? Arrays.copyOfRange(bools, 0, currentSize) : bools;
+    case BYTE:
+      return (bytes.length != currentSize) ? Arrays.copyOfRange(bytes, 0, currentSize) : bytes;
+    case SHORT:
+      return (shorts.length != currentSize) ? Arrays.copyOfRange(shorts, 0, currentSize) : shorts;
+    case INTEGER:
+      return (ints.length != currentSize) ? Arrays.copyOfRange(ints, 0, currentSize) : ints;
+    case LONG:
+      return (longs.length != currentSize) ? Arrays.copyOfRange(longs, 0, currentSize) : longs;
+    case FLOAT:
+      return (floats.length != currentSize) ? Arrays.copyOfRange(floats, 0, currentSize) : floats;
+    case DOUBLE:
+      return (doubles.length != currentSize) ? Arrays.copyOfRange(doubles, 0, currentSize)
+        : doubles;
+    case BINARY:
+      // org.apache.hadoop.hive.serde2.thrift.ColumnBuffer expects a List<ByteBuffer>, so convert
+      // when reading the value. The Hive/Thrift stack also dislikes nulls, and returning a
+      // list with a different number of elements than expected.
+      return Arrays.stream(buffers)
+        .limit(currentSize)
+        .map(b -> (b != null) ? ByteBuffer.wrap(b) : EMPTY_BUFFER)
+        .collect(Collectors.toList());
+    case STRING:
+      // org.apache.hadoop.hive.serde2.thrift.ColumnBuffer expects a List<String>, so convert
+      // when reading the value. The Hive/Thrift stack also dislikes nulls, and returning a
+      // list with a different number of elements than expected.
+      return Arrays.stream(strings)
+        .limit(currentSize)
+        .map(s -> (s != null) ? s : EMPTY_STRING)
+        .collect(Collectors.toList());
+    }
+
+    return null;
+  }
+
+  public BitSet getNulls() {
+    return nulls != null ? BitSet.valueOf(nulls) : new BitSet();
+  }
+
+  private boolean isNull(int index) {
+    if (nulls == null) {
+      return false;
+    }
+
+    int byteIdx = (index / Byte.SIZE);
+    if (byteIdx >= nulls.length) {
+      return false;
+    }
+
+    int bitIdx = (index % Byte.SIZE);
+    return (nulls[byteIdx] & (1 << bitIdx)) != 0;
+  }
+
+  private void setNull(int index) {
+    int byteIdx = (index / Byte.SIZE);
+
+    if (nulls == null) {
+      nulls = new byte[byteIdx + 1];
+    } else if (byteIdx >= nulls.length) {
+      nulls = Arrays.copyOf(nulls, byteIdx + 1);
+    }
+
+    int bitIdx = (index % Byte.SIZE);
+    nulls[byteIdx] = (byte) (nulls[byteIdx] | (1 << bitIdx));
+  }
+
+  /**
+   * Converts a value from a Spark dataset into a string that looks like what Hive would
+   * generate. Because Spark generates rows that contain Scala types for non-primitive
+   * columns, this code depends on Scala and is thus succeptible to binary compatibility
+   * changes in the Scala libraries.
+   *
+   * The supported types are described in Spark's SQL programming guide, in the table
+   * listing the mapping of SQL types to Scala types.
+   *
+   * @param value The object to stringify.
+   * @param quoteStrings Whether to wrap String instances in quotes.
+   */
+  private String toHiveString(Object value, boolean quoteStrings) {
+    if (quoteStrings && value instanceof String) {
+      return "\"" + value + "\"";
+    } else if (value instanceof BigDecimal) {
+      return ((BigDecimal) value).stripTrailingZeros().toString();
+    } else if (value instanceof Map) {
+      return stream(new ScalaIterator<>(((Map<?,?>) value).iterator()))
+        .map(o -> toHiveString(o, true))
+        .sorted()
+        .collect(Collectors.joining(",", "{", "}"));
+    } else if (value instanceof Seq) {
+      return stream(new ScalaIterator<>(((Seq<?>) value).iterator()))
+        .map(o -> toHiveString(o, true))
+        .collect(Collectors.joining(",", "[", "]"));
+    } else if (value instanceof Tuple2) {
+      Tuple2 t = (Tuple2) value;
+      return String.format("%s:%s", toHiveString(t._1(), true), toHiveString(t._2(), true));
+    } else if (value instanceof Row) {
+      Row r = (Row) value;
+      final StructField[] fields = r.schema().fields();
+      final AtomicInteger idx = new AtomicInteger();
+
+      return stream(new ScalaIterator<>(r.toSeq().iterator()))
+        .map(o -> {
+          String fname = fields[idx.getAndIncrement()].name();
+          String fval = toHiveString(o, true);
+          return String.format("\"%s\":%s", fname, fval);
+        })
+        .collect(Collectors.joining(",", "{", "}"));
+    } else {
+      return value.toString();
+    }
+  }
+
+  private Stream<?> stream(Iterator<?> it) {
+    return StreamSupport.stream(
+      Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
+  }
+
+  private void ensureCapacity() {
+    int nextSize = (currentSize + DEFAULT_SIZE);
+    nextSize = nextSize - (nextSize % DEFAULT_SIZE);
+
+    switch (type) {
+    case BOOLEAN:
+      if (bools.length <= currentSize) {
+        bools = Arrays.copyOf(bools, nextSize);
+      }
+      break;
+    case BYTE:
+      if (bytes.length <= currentSize) {
+        bytes = Arrays.copyOf(bytes, nextSize);
+      }
+      break;
+    case SHORT:
+      if (shorts.length <= currentSize) {
+        shorts = Arrays.copyOf(shorts, nextSize);
+      }
+      break;
+    case INTEGER:
+      if (ints.length <= currentSize) {
+        ints = Arrays.copyOf(ints, nextSize);
+      }
+      break;
+    case LONG:
+      if (longs.length <= currentSize) {
+        longs = Arrays.copyOf(longs, nextSize);
+      }
+      break;
+    case FLOAT:
+      if (floats.length <= currentSize) {
+        floats = Arrays.copyOf(floats, nextSize);
+      }
+      break;
+    case DOUBLE:
+      if (doubles.length <= currentSize) {
+        doubles = Arrays.copyOf(doubles, nextSize);
+      }
+      break;
+    case BINARY:
+      if (buffers.length <= currentSize) {
+        buffers = Arrays.copyOf(buffers, nextSize);
+      }
+      break;
+    case STRING:
+      if (strings.length <= currentSize) {
+        strings = Arrays.copyOf(strings, nextSize);
+      }
+      break;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/DataType.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/DataType.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/DataType.java
new file mode 100644
index 0000000..ab8f665
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/DataType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.thriftserver.session;
+
+/**
+ * Enum representing the way user data is encoded on the wire between the Livy session and
+ * the server.
+ */
+public enum DataType {
+
+  BOOLEAN,
+  BYTE,
+  SHORT,
+  INTEGER,
+  LONG,
+  FLOAT,
+  DOUBLE,
+  BINARY,
+  STRING;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultJob.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultJob.java
new file mode 100644
index 0000000..09b69c9
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultJob.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.Iterator;
+
+import org.apache.spark.sql.Row;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+/**
+ * Job used to fetch results of a query.
+ */
+public class FetchResultJob implements Job<ResultSet> {
+
+  private final String sessionId;
+  private final String statementId;
+  private final int maxRows;
+
+  public FetchResultJob() {
+    this(null, null, -1);
+  }
+
+  public FetchResultJob(String sessionId, String statementId, int maxRows) {
+    this.sessionId = sessionId;
+    this.statementId = statementId;
+    this.maxRows = maxRows;
+  }
+
+  @Override
+  public ResultSet call(JobContext ctx) {
+    ThriftSessionState session = ThriftSessionState.get(ctx, sessionId);
+    StatementState st = session.findStatement(statementId);
+    Iterator<Row> iter = st.iter;
+
+    ResultSet rs = new ResultSet(st.types, st.schema);
+    int count = 0;
+    while (iter.hasNext() && count < maxRows) {
+      Row row = iter.next();
+      Object[] cols = new Object[st.types.length];
+      for (int i = 0; i < cols.length; i++) {
+        cols[i] = row.get(i);
+      }
+      rs.addRow(cols);
+      count++;
+    }
+
+    return rs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultSchemaJob.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultSchemaJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultSchemaJob.java
new file mode 100644
index 0000000..13167ec
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchResultSchemaJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+/**
+ * Job used to fetch the schema of query's results.
+ */
+public class FetchResultSchemaJob implements Job<String> {
+
+  private final String sessionId;
+  private final String statementId;
+
+  public FetchResultSchemaJob() {
+    this(null, null);
+  }
+
+  public FetchResultSchemaJob(String sessionId, String statementId) {
+    this.sessionId = sessionId;
+    this.statementId = statementId;
+  }
+
+  @Override
+  public String call(JobContext ctx) {
+    ThriftSessionState session = ThriftSessionState.get(ctx, sessionId);
+    return session.findStatement(statementId).schema;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
new file mode 100644
index 0000000..6a6bec3
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+/**
+ * Job used to register a new Thrift session. Initializes the session state and stores it in the
+ * job context.
+ */
+public class RegisterSessionJob implements Job<Boolean> {
+
+  private final String sessionId;
+
+  public RegisterSessionJob() {
+    this(null);
+  }
+
+  public RegisterSessionJob(String sessionId) {
+    this.sessionId = sessionId;
+  }
+
+  @Override
+  public Boolean call(JobContext ctx) throws Exception {
+    ThriftSessionState.create(ctx, sessionId);
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java
new file mode 100644
index 0000000..3fb69f4
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.thriftserver.session;
+
+/**
+ * Utility class used for transferring results from the Spark application to the Livy server.
+ */
+public class ResultSet {
+
+  private final String schema;
+  private final ColumnBuffer[] columns;
+
+  public ResultSet() {
+    this.schema = null;
+    this.columns = null;
+  }
+
+  public ResultSet(DataType[] types, String schema) {
+    this.schema = schema;
+    this.columns = new ColumnBuffer[types.length];
+    for (int i = 0; i < columns.length; i++) {
+      columns[i] = new ColumnBuffer(types[i]);
+    }
+  }
+
+  public void addRow(Object[] fields) {
+    if (fields.length != columns.length) {
+      throw new IllegalArgumentException("Not enough columns in given row.");
+    }
+
+    for (int i = 0; i < fields.length; i++) {
+      columns[i].add(fields[i]);
+    }
+  }
+
+  public String getSchema() {
+    return schema;
+  }
+
+  public ColumnBuffer[] getColumns() {
+    return columns;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ScalaIterator.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ScalaIterator.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ScalaIterator.java
new file mode 100644
index 0000000..bc995a9
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ScalaIterator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.Iterator;
+
+/**
+ * Wrapper that provides a Java iterator interface over a Scala iterator.
+ */
+class ScalaIterator<T> implements Iterator<T> {
+
+  private final scala.collection.Iterator<T> it;
+
+  ScalaIterator(scala.collection.Iterator<T> it) {
+    this.it = it;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return it.hasNext();
+  }
+
+  @Override
+  public T next() {
+    return it.next();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/39fa887c/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
----------------------------------------------------------------------
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
new file mode 100644
index 0000000..99eab4d
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import org.apache.spark.sql.types.*;
+
+/**
+ * Utilities that should only be executed on the session side; they'll throw errors on the
+ * Livy server side since Spark classes are not available.
+ */
+final class SparkUtils {
+
+  /**
+   * @return the wire types used to encode the given Spark schema.
+   */
+  public static DataType[] translateSchema(StructType schema) {
+    DataType[] types = new DataType[schema.fields().length];
+    int idx = 0;
+    for (StructField f : schema.fields()) {
+      Object ftype = f.dataType();
+      if (ftype instanceof BooleanType) {
+        types[idx] = DataType.BOOLEAN;
+      } else if (ftype instanceof ByteType) {
+        types[idx] = DataType.BYTE;
+      } else if (ftype instanceof ShortType) {
+        types[idx] = DataType.SHORT;
+      } else if (ftype instanceof IntegerType) {
+        types[idx] = DataType.INTEGER;
+      } else if (ftype instanceof LongType) {
+        types[idx] = DataType.LONG;
+      } else if (ftype instanceof FloatType) {
+        types[idx] = DataType.FLOAT;
+      } else if (ftype instanceof DoubleType) {
+        types[idx] = DataType.DOUBLE;
+      } else if (ftype instanceof BinaryType) {
+        types[idx] = DataType.BINARY;
+      } else {
+        types[idx] = DataType.STRING;
+      }
+      idx++;
+    }
+    return types;
+  }
+
+}