You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/08/16 02:32:30 UTC

[incubator-livy] branch master updated: [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new cae9d97  [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server
cae9d97 is described below

commit cae9d97185bf371912dcd863dff5babfd9cb704a
Author: yihengwang <yi...@tencent.com>
AuthorDate: Fri Aug 16 10:32:11 2019 +0800

    [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server
    
    ## What changes were proposed in this pull request?
    In this patch, we add the implementations of GetSchemas, GetFunctions, GetTables, and GetColumns in Livy Thrift server.
    
    https://issues.apache.org/jira/browse/LIVY-622
    https://issues.apache.org/jira/browse/LIVY-623
    https://issues.apache.org/jira/browse/LIVY-624
    https://issues.apache.org/jira/browse/LIVY-625
    
    ## How was this patch tested?
    Add new unit tests and integration test. Run them with existing tests.
    
    Author: yihengwang <yi...@tencent.com>
    
    Closes #194 from yiheng/fix_575.
---
 .../apache/livy/thriftserver/LivyCLIService.scala  |  16 +--
 .../livy/thriftserver/LivyOperationManager.scala   |  63 ++++++++
 .../livy/thriftserver/cli/ThriftCLIService.scala   |  17 ++-
 .../operation/GetColumnsOperation.scala            | 102 +++++++++++++
 .../operation/GetFunctionsOperation.scala          |  94 ++++++++++++
 .../operation/GetSchemasOperation.scala            |  63 ++++++++
 .../operation/GetTablesOperation.scala             |  73 ++++++++++
 .../thriftserver/operation/MetadataOperation.scala |   6 +
 .../operation/SparkCatalogOperation.scala          | 119 ++++++++++++++++
 .../livy/thriftserver/ThriftServerSuites.scala     | 158 ++++++++++++++++++++-
 .../livy/thriftserver/session/CatalogJobState.java |  28 ++++
 .../session/CleanupCatalogResultJob.java           |  37 +++++
 .../livy/thriftserver/session/ColumnBuffer.java    |  36 +++++
 .../session/FetchCatalogResultJob.java             |  51 +++++++
 .../livy/thriftserver/session/GetColumnsJob.java   |  93 ++++++++++++
 .../livy/thriftserver/session/GetFunctionsJob.java |  67 +++++++++
 .../livy/thriftserver/session/GetSchemasJob.java   |  47 ++++++
 .../livy/thriftserver/session/GetTablesJob.java    |  92 ++++++++++++
 .../livy/thriftserver/session/SparkCatalogJob.java |  50 +++++++
 .../livy/thriftserver/session/SparkUtils.java      | 113 +++++++++++++++
 .../thriftserver/session/ThriftSessionState.java   |  32 +++++
 .../thriftserver/session/ThriftSessionTest.java    |  53 ++++++-
 22 files changed, 1395 insertions(+), 15 deletions(-)

diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
index 725bdc8..3c84b4a 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
@@ -215,8 +215,8 @@ class LivyCLIService(server: LivyThriftServer)
       sessionHandle: SessionHandle,
       catalogName: String,
       schemaName: String): OperationHandle = {
-    // TODO
-    throw new HiveSQLException("Operation GET_SCHEMAS is not yet supported")
+    sessionManager.operationManager.getSchemas(
+      sessionHandle, catalogName, schemaName)
   }
 
   @throws[HiveSQLException]
@@ -226,8 +226,8 @@ class LivyCLIService(server: LivyThriftServer)
       schemaName: String,
       tableName: String,
       tableTypes: util.List[String]): OperationHandle = {
-    // TODO
-    throw new HiveSQLException("Operation GET_TABLES is not yet supported")
+    sessionManager.operationManager.getTables(
+      sessionHandle, catalogName, schemaName, tableName, tableTypes)
   }
 
   @throws[HiveSQLException]
@@ -243,8 +243,8 @@ class LivyCLIService(server: LivyThriftServer)
       schemaName: String,
       tableName: String,
       columnName: String): OperationHandle = {
-    // TODO
-    throw new HiveSQLException("Operation GET_COLUMNS is not yet supported")
+    sessionManager.operationManager.getColumns(
+      sessionHandle, catalogName, schemaName, tableName, columnName)
   }
 
   @throws[HiveSQLException]
@@ -253,8 +253,8 @@ class LivyCLIService(server: LivyThriftServer)
       catalogName: String,
       schemaName: String,
       functionName: String): OperationHandle = {
-    // TODO
-    throw new HiveSQLException("Operation GET_FUNCTIONS is not yet supported")
+    sessionManager.operationManager.getFunctions(
+      sessionHandle, catalogName, schemaName, functionName)
   }
 
   @throws[HiveSQLException]
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
index e6d48ff..2454185 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
@@ -187,6 +187,69 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage
     })
   }
 
+  @throws[HiveSQLException]
+  def getTables(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      tableTypes: util.List[String]): OperationHandle = {
+    executeOperation(sessionHandle, {
+      val op = new GetTablesOperation(
+        sessionHandle,
+        catalogName,
+        schemaName,
+        tableName,
+        tableTypes,
+        livyThriftSessionManager)
+      addOperation(op, sessionHandle)
+      op
+    })
+  }
+
+  @throws[HiveSQLException]
+  def getFunctions(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      functionName: String): OperationHandle = {
+    executeOperation(sessionHandle, {
+      val op = new GetFunctionsOperation(sessionHandle, catalogName, schemaName, functionName,
+        livyThriftSessionManager)
+      addOperation(op, sessionHandle)
+      op
+    })
+  }
+
+  @throws[HiveSQLException]
+  def getSchemas(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String): OperationHandle = {
+    executeOperation(sessionHandle, {
+      val op = new GetSchemasOperation(sessionHandle, catalogName, schemaName,
+        livyThriftSessionManager)
+      addOperation(op, sessionHandle)
+      op
+    })
+  }
+
+  @throws[HiveSQLException]
+  def getColumns(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      columnName: String): OperationHandle = {
+    executeOperation(sessionHandle, {
+      val op = new GetColumnsOperation(sessionHandle, catalogName, schemaName, tableName,
+        columnName, livyThriftSessionManager)
+      addOperation(op, sessionHandle)
+      op
+    })
+  }
+
+
   /**
    * Cancel the running operation unless it is already in a terminal state
    */
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala
index 4a3276f..da108ab 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala
@@ -427,8 +427,8 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
   override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = {
     val resp = new TGetSchemasResp
     try {
-      val opHandle = cliService.getSchemas(
-        new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName)
+      val opHandle = cliService.getSchemas(createSessionHandle(req.getSessionHandle),
+        req.getCatalogName, req.getSchemaName)
       resp.setOperationHandle(opHandle.toTOperationHandle)
       resp.setStatus(ThriftCLIService.OK_STATUS)
     } catch {
@@ -444,7 +444,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
     val resp = new TGetTablesResp
     try {
       val opHandle = cliService.getTables(
-        new SessionHandle(req.getSessionHandle),
+        createSessionHandle(req.getSessionHandle),
         req.getCatalogName,
         req.getSchemaName,
         req.getTableName,
@@ -479,7 +479,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
     val resp = new TGetColumnsResp
     try {
       val opHandle = cliService.getColumns(
-        new SessionHandle(req.getSessionHandle),
+        createSessionHandle(req.getSessionHandle),
         req.getCatalogName,
         req.getSchemaName,
         req.getTableName,
@@ -499,7 +499,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
     val resp = new TGetFunctionsResp
     try {
       val opHandle = cliService.getFunctions(
-        new SessionHandle(req.getSessionHandle),
+        createSessionHandle(req.getSessionHandle),
         req.getCatalogName,
         req.getSchemaName,
         req.getFunctionName)
@@ -728,6 +728,13 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
           s"Failed to validate proxy privilege of $realUser for $proxyUser", "08S01", e)
     }
   }
+
+  private def createSessionHandle(tHandle: TSessionHandle): SessionHandle = {
+    val protocolVersion = cliService.getSessionManager
+      .getSessionInfo(new SessionHandle(tHandle))
+      .protocolVersion
+    new SessionHandle(tHandle, protocolVersion)
+  }
 }
 
 object ThriftCLIService {
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
new file mode 100644
index 0000000..c9c106c
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.session.{GetColumnsJob, GetFunctionsJob}
+
+class GetColumnsOperation(
+    sessionHandle: SessionHandle,
+    catalogName: String,
+    schemaName: String,
+    tableName: String,
+    columnName: String,
+    sessionManager: LivyThriftSessionManager)
+  extends SparkCatalogOperation(
+    sessionHandle, OperationType.GET_COLUMNS, sessionManager) with Logging {
+
+  @throws(classOf[HiveSQLException])
+  override protected def runInternal(): Unit = {
+    setState(OperationState.RUNNING)
+    try {
+      rscClient.submit(new GetColumnsJob(
+        convertSchemaPattern(schemaName),
+        convertIdentifierPattern(tableName, datanucleusFormat = true),
+        Option(columnName).map { convertIdentifierPattern(_, datanucleusFormat = false) }.orNull,
+        sessionId,
+        jobId
+      )).get()
+
+      setState(OperationState.FINISHED)
+    } catch {
+      case e: Throwable =>
+        error("Remote job meet an exception: ", e)
+        setState(OperationState.ERROR)
+        throw new HiveSQLException(e)
+    }
+  }
+
+  @throws(classOf[HiveSQLException])
+  override def getResultSetSchema: Schema = {
+    assertState(Seq(OperationState.FINISHED))
+    GetColumnsOperation.SCHEMA
+  }
+}
+
+object GetColumnsOperation {
+  val SCHEMA = Schema(
+    Field("TABLE_CAT", BasicDataType("string"), "Catalog name. NULL if not applicable."),
+    Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
+    Field("TABLE_NAME", BasicDataType("string"), "Table name."),
+    Field("COLUMN_NAME", BasicDataType("string"), "Column name"),
+    Field("DATA_TYPE", BasicDataType("integer"), "SQL type from java.sql.Types"),
+    Field("TYPE_NAME", BasicDataType("string"),
+      "Data source dependent type name, for a UDT the type name is fully qualified"),
+    Field("COLUMN_SIZE", BasicDataType("integer"), "Column size. For char or date types this is " +
+      "the maximum number of characters, for numeric or decimal types this is precision."),
+    Field("BUFFER_LENGTH", BasicDataType("byte"), "Unused"),
+    Field("DECIMAL_DIGITS", BasicDataType("integer"), "The number of fractional digits"),
+    Field("NUM_PREC_RADIX", BasicDataType("integer"), "Radix (typically either 10 or 2)"),
+    Field("NULLABLE", BasicDataType("integer"), "Is NULL allowed"),
+    Field("REMARKS", BasicDataType("string"), "Comment describing column (may be null)"),
+    Field("COLUMN_DEF", BasicDataType("string"), "Default value (may be null)"),
+    Field("SQL_DATA_TYPE", BasicDataType("integer"), "Unused"),
+    Field("SQL_DATETIME_SUB", BasicDataType("integer"), "Unused"),
+    Field("CHAR_OCTET_LENGTH", BasicDataType("integer"), "For char types the maximum number of " +
+      "bytes in the column"),
+    Field("ORDINAL_POSITION", BasicDataType("integer"), "Index of column in table (starting at 1)"),
+    Field("IS_NULLABLE", BasicDataType("string"), "\"NO\" means column definitely does not " +
+      "allow NULL values; \"YES\" means the column might allow NULL values. An empty string " +
+      "means nobody knows."),
+    Field("SCOPE_CATALOG", BasicDataType("string"), "Catalog of table that is the scope of a " +
+      "reference attribute (null if DATA_TYPE isn't REF)"),
+    Field("SCOPE_SCHEMA", BasicDataType("string"), "Schema of table that is the scope of a " +
+      "reference attribute (null if the DATA_TYPE isn't REF)"),
+    Field("SCOPE_TABLE", BasicDataType("string"), "Table name that this the scope of a " +
+      "reference attribure (null if the DATA_TYPE isn't REF)"),
+    Field("SOURCE_DATA_TYPE", BasicDataType("short"), "Source type of a distinct type or " +
+      "user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't " +
+      "DISTINCT or user-generated REF)"),
+    Field("IS_AUTO_INCREMENT", BasicDataType("string"), "Indicates whether this column is " +
+      "auto incremented.")
+  )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
new file mode 100644
index 0000000..0e43f16
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.session.GetFunctionsJob
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+
+class GetFunctionsOperation(
+    sessionHandle: SessionHandle,
+    catalogName: String,
+    schemaName: String,
+    functionName: String,
+    sessionManager: LivyThriftSessionManager)
+  extends SparkCatalogOperation(
+    sessionHandle, OperationType.GET_FUNCTIONS, sessionManager) with Logging {
+
+  @throws(classOf[HiveSQLException])
+  override protected def runInternal(): Unit = {
+    setState(OperationState.RUNNING)
+    try {
+      rscClient.submit(new GetFunctionsJob(
+        convertSchemaPattern(schemaName),
+        convertFunctionName(functionName),
+        sessionId,
+        jobId
+      )).get()
+
+      setState(OperationState.FINISHED)
+    } catch {
+      case e: Throwable =>
+        error("Remote job meet an exception: ", e)
+        setState(OperationState.ERROR)
+        throw new HiveSQLException(e)
+    }
+  }
+
+  @throws(classOf[HiveSQLException])
+  override def getResultSetSchema: Schema = {
+    assertState(Seq(OperationState.FINISHED))
+    GetFunctionsOperation.SCHEMA
+  }
+
+  private def convertFunctionName(name: String): String = {
+    if (name == null) {
+      ".*"
+    } else {
+      var escape = false
+      name.flatMap {
+        case c if escape =>
+          if (c != '\\') escape = false
+          c.toString
+        case '\\' =>
+          escape = true
+          ""
+        case '%' => ".*"
+        case '_' => "."
+        case c => Character.toLowerCase(c).toString
+      }
+    }
+  }
+}
+
+object GetFunctionsOperation {
+  val SCHEMA = Schema(
+    Field("FUNCTION_CAT", BasicDataType("string"), "Function catalog (may be null)"),
+    Field("FUNCTION_SCHEM", BasicDataType("string"), "Function schema (may be null)"),
+    Field("FUNCTION_NAME", BasicDataType("string"),
+      "Function name. This is the name used to invoke the function"),
+    Field("REMARKS", BasicDataType("string"), "Explanatory comment on the function"),
+    Field("FUNCTION_TYPE", BasicDataType("integer"),
+      "Kind of function."),
+    Field("SPECIFIC_NAME", BasicDataType("string"),
+      "The name which uniquely identifies this function within its schema")
+  )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
new file mode 100644
index 0000000..6bd0a17
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli._
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.session.{GetSchemasJob, GetTablesJob}
+
+class GetSchemasOperation(
+    sessionHandle: SessionHandle,
+    catalogName: String,
+    schemaName: String,
+    sessionManager: LivyThriftSessionManager)
+  extends SparkCatalogOperation(
+    sessionHandle, OperationType.GET_SCHEMAS, sessionManager) with Logging {
+
+  @throws(classOf[HiveSQLException])
+  override protected def runInternal(): Unit = {
+    setState(OperationState.RUNNING)
+    try {
+      rscClient.submit(new GetSchemasJob(
+        convertSchemaPattern(schemaName), sessionId, jobId
+      )).get()
+      setState(OperationState.FINISHED)
+    } catch {
+      case e: Throwable =>
+        error("Remote job meet an exception: ", e)
+        setState(OperationState.ERROR)
+        throw new HiveSQLException(e)
+    }
+  }
+
+  @throws(classOf[HiveSQLException])
+  override def getResultSetSchema: Schema = {
+    assertState(Seq(OperationState.FINISHED))
+    GetSchemasOperation.SCHEMA
+  }
+}
+
+object GetSchemasOperation {
+  val SCHEMA = Schema(
+    Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
+    Field("TABLE_CATALOG", BasicDataType("string"), "Catalog name.")
+  )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
new file mode 100644
index 0000000..4a939b3
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.session.GetTablesJob
+
+class GetTablesOperation(
+    sessionHandle: SessionHandle,
+    catalogName: String,
+    schemaName: String,
+    tableName: String,
+    tableTypes: java.util.List[String],
+    sessionManager: LivyThriftSessionManager)
+  extends SparkCatalogOperation(
+    sessionHandle, OperationType.GET_TABLES, sessionManager) with Logging {
+
+  @throws(classOf[HiveSQLException])
+  override protected def runInternal(): Unit = {
+    setState(OperationState.RUNNING)
+    try {
+      rscClient.submit(new GetTablesJob(
+        convertSchemaPattern(schemaName),
+        convertIdentifierPattern(tableName, datanucleusFormat = true),
+        tableTypes,
+        sessionId,
+        jobId
+      )).get()
+
+      setState(OperationState.FINISHED)
+    } catch {
+      case e: Throwable =>
+        error("Remote job meet an exception: ", e)
+        setState(OperationState.ERROR)
+        throw new HiveSQLException(e)
+    }
+  }
+
+  @throws(classOf[HiveSQLException])
+  override def getResultSetSchema: Schema = {
+    assertState(Seq(OperationState.FINISHED))
+    GetTablesOperation.SCHEMA
+  }
+}
+
+object GetTablesOperation {
+  val SCHEMA = Schema(
+    Field("TABLE_CAT", BasicDataType("string"), "Catalog name. NULL if not applicable."),
+    Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
+    Field("TABLE_NAME", BasicDataType("string"), "Table name."),
+    Field("TABLE_TYPE", BasicDataType("string"), "The table type, e.g. \"TABLE\", \"VIEW\", etc."),
+    Field("REMARKS", BasicDataType("string"), "Comments about the table.")
+  )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
index 4db3929..4689f26 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
@@ -21,6 +21,12 @@ import org.apache.hive.service.cli.{FetchOrientation, HiveSQLException, Operatio
 
 import org.apache.livy.thriftserver.serde.ThriftResultSet
 
+/**
+  * MetadataOperation is the base class for operations which do not perform any call on Spark side
+  *
+  * @param sessionHandle
+  * @param opType
+  */
 abstract class MetadataOperation(sessionHandle: SessionHandle, opType: OperationType)
   extends Operation(sessionHandle, opType) {
   setHasResultSet(true)
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
new file mode 100644
index 0000000..9ed31f7
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.commons.lang.StringUtils
+import org.apache.hive.service.cli._
+
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.serde.ThriftResultSet
+import org.apache.livy.thriftserver.session.{CleanupCatalogResultJob, FetchCatalogResultJob}
+
+/**
+ * SparkCatalogOperation is the base class for operations which need to fetch catalog information
+ * from spark session.
+ */
+abstract class SparkCatalogOperation(
+    sessionHandle: SessionHandle,
+    opType: OperationType,
+    sessionManager: LivyThriftSessionManager)
+  extends Operation(sessionHandle, opType) {
+
+  // The initialization need to be lazy in order not to block when the instance is created
+  protected lazy val rscClient = {
+    // This call is blocking, we are waiting for the session to be ready.
+    sessionManager.getLivySession(sessionHandle).client.get
+  }
+
+  protected lazy val jobId = {
+    this.opHandle.getHandleIdentifier.getPublicId.toString + "-" +
+      this.opHandle.getHandleIdentifier.getSecretId.toString
+  }
+
+  protected lazy val sessionId = {
+    sessionHandle.getSessionId.toString
+  }
+
+  @throws[HiveSQLException]
+  override def close(): Unit = {
+    val cleaned = rscClient.submit(new CleanupCatalogResultJob(sessionId, jobId)).get()
+    if (!cleaned) {
+      warn(s"Fail to cleanup fetch catalog job (session = ${sessionId}), " +
+        "this message can be ignored if the job failed.")
+    }
+    setState(OperationState.CLOSED)
+  }
+
+  @throws[HiveSQLException]
+  override def cancel(stateAfterCancel: OperationState): Unit = {
+    setState(OperationState.CANCELED)
+    // Spark fetch schema is not a really spark job. It only run on driver and cannot be cancelled
+    throw new UnsupportedOperationException("SparkCatalogOperation.cancel()")
+  }
+
+  /**
+   * Convert wildchars and escape sequence from JDBC format to datanucleous/regex
+   *
+   * This is ported from Spark Hive Thrift MetaOperation.
+   */
+  protected def convertIdentifierPattern(pattern: String, datanucleusFormat: Boolean): String = {
+    if (pattern == null) {
+      convertPattern("%", datanucleusFormat = true)
+    } else {
+      convertPattern(pattern, datanucleusFormat)
+    }
+  }
+
+  /**
+   * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex
+   * The schema pattern treats empty string also as wildchar.
+   *
+   * This is ported from Spark Hive Thrift MetaOperation.
+   */
+  protected def convertSchemaPattern(pattern: String): String = {
+    if (StringUtils.isEmpty(pattern)) {
+      convertPattern("%", datanucleusFormat = true)
+    } else {
+      convertPattern(pattern, datanucleusFormat = true)
+    }
+  }
+
+  private def convertPattern(pattern: String, datanucleusFormat: Boolean): String = {
+    val wStr = if (datanucleusFormat) "*" else ".*"
+    pattern
+      .replaceAll("([^\\\\])%", "$1" + wStr)
+      .replaceAll("\\\\%", "%")
+      .replaceAll("^%", wStr)
+      .replaceAll("([^\\\\])_", "$1.")
+      .replaceAll("\\\\_", "_")
+      .replaceAll("^_", ".")
+  }
+
+  override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): ThriftResultSet = {
+    validateFetchOrientation(orientation)
+    assertState(Seq(OperationState.FINISHED))
+    setHasResultSet(true)
+    val maxRows = maxRowsL.toInt
+    val results = rscClient.submit(new FetchCatalogResultJob(sessionId, jobId, maxRows)).get()
+
+    val rowSet = ThriftResultSet.apply(getResultSetSchema, protocolVersion)
+    import scala.collection.JavaConverters._
+    results.asScala.foreach { r => rowSet.addRow(r.asInstanceOf[Array[Any]]) }
+    return rowSet
+  }
+}
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
index a3d9e88..6411881 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -17,7 +17,7 @@
 
 package org.apache.livy.thriftserver
 
-import java.sql.{Date, SQLException, Statement}
+import java.sql.{Connection, Date, SQLException, Statement}
 
 import org.apache.livy.LivyConf
 
@@ -69,6 +69,114 @@ trait CommonThriftTests {
     }
     assert(!resultSetComplex.next())
   }
+
+  def getSchemasTest(connection: Connection): Unit = {
+    val metadata = connection.getMetaData
+    val schemaResultSet = metadata.getSchemas()
+    assert(schemaResultSet.getMetaData.getColumnCount == 2)
+    assert(schemaResultSet.getMetaData.getColumnName(1) == "TABLE_SCHEM")
+    assert(schemaResultSet.getMetaData.getColumnName(2) == "TABLE_CATALOG")
+    schemaResultSet.next()
+    assert(schemaResultSet.getString(1) == "default")
+    assert(!schemaResultSet.next())
+  }
+
+  def getFunctionsTest(connection: Connection): Unit = {
+    val metadata = connection.getMetaData
+
+    val functionResultSet = metadata.getFunctions("", "default", "unix_timestamp")
+    assert(functionResultSet.getMetaData.getColumnCount == 6)
+    assert(functionResultSet.getMetaData.getColumnName(1) == "FUNCTION_CAT")
+    assert(functionResultSet.getMetaData.getColumnName(2) == "FUNCTION_SCHEM")
+    assert(functionResultSet.getMetaData.getColumnName(3) == "FUNCTION_NAME")
+    assert(functionResultSet.getMetaData.getColumnName(4) == "REMARKS")
+    assert(functionResultSet.getMetaData.getColumnName(5) == "FUNCTION_TYPE")
+    assert(functionResultSet.getMetaData.getColumnName(6) == "SPECIFIC_NAME")
+    functionResultSet.next()
+    assert(functionResultSet.getString(3) == "unix_timestamp")
+    assert(functionResultSet.getString(6) ==
+      "org.apache.spark.sql.catalyst.expressions.UnixTimestamp")
+    assert(!functionResultSet.next())
+  }
+
+  def getTablesTest(connection: Connection): Unit = {
+    val statement = connection.createStatement()
+    statement.execute("CREATE TABLE test_get_tables (id integer, desc string) USING json")
+    statement.close()
+
+    val metadata = connection.getMetaData
+    val tablesResultSet = metadata.getTables("", "default", "*", Array("TABLE"))
+    assert(tablesResultSet.getMetaData.getColumnCount == 5)
+    assert(tablesResultSet.getMetaData.getColumnName(1) == "TABLE_CAT")
+    assert(tablesResultSet.getMetaData.getColumnName(2) == "TABLE_SCHEM")
+    assert(tablesResultSet.getMetaData.getColumnName(3) == "TABLE_NAME")
+    assert(tablesResultSet.getMetaData.getColumnName(4) == "TABLE_TYPE")
+    assert(tablesResultSet.getMetaData.getColumnName(5) == "REMARKS")
+
+    tablesResultSet.next()
+    assert(tablesResultSet.getString(3) == "test_get_tables")
+    assert(tablesResultSet.getString(4) == "TABLE")
+    assert(!tablesResultSet.next())
+  }
+
+  def getColumnsTest(connection: Connection): Unit = {
+    val metadata = connection.getMetaData
+    val statement = connection.createStatement()
+    statement.execute("CREATE TABLE test_get_columns (id integer, desc string) USING json")
+    statement.close()
+
+    val columnsResultSet = metadata.getColumns("", "default", "test_get_columns", ".*")
+    assert(columnsResultSet.getMetaData.getColumnCount == 23)
+    columnsResultSet.next()
+    assert(columnsResultSet.getString(1) == "")
+    assert(columnsResultSet.getString(2) == "default")
+    assert(columnsResultSet.getString(3) == "test_get_columns")
+    assert(columnsResultSet.getString(4) == "id")
+    assert(columnsResultSet.getInt(5) == 4)
+    assert(columnsResultSet.getString(6) == "integer")
+    assert(columnsResultSet.getInt(7) == 10)
+    assert(columnsResultSet.getString(8) == null)
+    assert(columnsResultSet.getInt(9) == 0)
+    assert(columnsResultSet.getInt(10) == 10)
+    assert(columnsResultSet.getInt(11) == 1)
+    assert(columnsResultSet.getString(12) == "")
+    assert(columnsResultSet.getString(13) == null)
+    assert(columnsResultSet.getString(14) == null)
+    assert(columnsResultSet.getString(15) == null)
+    assert(columnsResultSet.getString(15) == null)
+    assert(columnsResultSet.getInt(17) == 0)
+    assert(columnsResultSet.getString(18) == "YES")
+    assert(columnsResultSet.getString(19) == null)
+    assert(columnsResultSet.getString(20) == null)
+    assert(columnsResultSet.getString(21) == null)
+    assert(columnsResultSet.getString(22) == null)
+    assert(columnsResultSet.getString(23) == "NO")
+    columnsResultSet.next()
+    assert(columnsResultSet.getString(1) == "")
+    assert(columnsResultSet.getString(2) == "default")
+    assert(columnsResultSet.getString(3) == "test_get_columns")
+    assert(columnsResultSet.getString(4) == "desc")
+    assert(columnsResultSet.getInt(5) == 12)
+    assert(columnsResultSet.getString(6) == "string")
+    assert(columnsResultSet.getInt(7) == Integer.MAX_VALUE)
+    assert(columnsResultSet.getString(8) == null)
+    assert(columnsResultSet.getString(9) == null)
+    assert(columnsResultSet.getString(10) == null)
+    assert(columnsResultSet.getInt(11) == 1)
+    assert(columnsResultSet.getString(12) == "")
+    assert(columnsResultSet.getString(13) == null)
+    assert(columnsResultSet.getString(14) == null)
+    assert(columnsResultSet.getString(15) == null)
+    assert(columnsResultSet.getString(16) == null)
+    assert(columnsResultSet.getInt(17) == 1)
+    assert(columnsResultSet.getString(18) == "YES")
+    assert(columnsResultSet.getString(19) == null)
+    assert(columnsResultSet.getString(20) == null)
+    assert(columnsResultSet.getString(21) == null)
+    assert(columnsResultSet.getString(22) == null)
+    assert(columnsResultSet.getString(23) == "NO")
+    assert(!columnsResultSet.next())
+  }
 }
 
 class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests {
@@ -163,6 +271,30 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTest
       assert(caught.getMessage.contains("Table or view not found: `global_temp`.`invalid_table`"))
     }
   }
+
+  test("fetch schemas") {
+    withJdbcConnection { connection =>
+      getSchemasTest(connection)
+    }
+  }
+
+  test("fetch functions") {
+    withJdbcConnection { connection =>
+      getFunctionsTest(connection)
+    }
+  }
+
+  test("fetch tables") {
+    withJdbcConnection { connection =>
+      getTablesTest(connection)
+    }
+  }
+
+  test("fetch column") {
+    withJdbcConnection { connection =>
+      getColumnsTest(connection)
+    }
+  }
 }
 
 class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests {
@@ -175,4 +307,28 @@ class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests
       dataTypesTest(statement, supportMap)
     }
   }
+
+  test("fetch schemas") {
+    withJdbcConnection { connection =>
+      getSchemasTest(connection)
+    }
+  }
+
+  test("fetch functions") {
+    withJdbcConnection { connection =>
+      getFunctionsTest(connection)
+    }
+  }
+
+  test("fetch tables") {
+    withJdbcConnection { connection =>
+      getTablesTest(connection)
+    }
+  }
+
+  test("fetch column") {
+    withJdbcConnection { connection =>
+      getColumnsTest(connection)
+    }
+  }
 }
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java
new file mode 100644
index 0000000..5571574
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.Iterator;
+
+public class CatalogJobState {
+  final Iterator<Object[]> iter;
+
+  public CatalogJobState(Iterator<Object[]> iter) {
+        this.iter = iter;
+    }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java
new file mode 100644
index 0000000..b9444ca
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class CleanupCatalogResultJob implements Job<Boolean> {
+  private final String sessionId;
+  private final String jobId;
+
+  public CleanupCatalogResultJob(String sessionId, String jobId) {
+    this.sessionId = sessionId;
+    this.jobId = jobId;
+  }
+
+  @Override
+  public Boolean call(JobContext jc) throws Exception {
+    ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
+      return session.cleanupCatalogJob(jobId);
+    }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
index 4408586..d4ec747 100644
--- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
@@ -204,6 +204,42 @@ public class ColumnBuffer {
     return nulls != null ? BitSet.valueOf(nulls) : new BitSet();
   }
 
+  public ColumnBuffer extractSubset(int start, int end) {
+    ColumnBuffer subset = new ColumnBuffer(type);
+    subset.currentSize = end - start;
+    subset.ensureCapacity();
+    switch (type) {
+      case BOOLEAN:
+        System.arraycopy(bools, start, subset.bools, 0, end - start);
+        break;
+      case BYTE:
+        System.arraycopy(bytes, start, subset.bytes, 0, end - start);
+        break;
+      case SHORT:
+        System.arraycopy(shorts, start, subset.shorts, 0, end - start);
+        break;
+      case INTEGER:
+        System.arraycopy(ints, start, subset.ints, 0, end - start);
+        break;
+      case LONG:
+        System.arraycopy(longs, start, subset.longs, 0, end - start);
+        break;
+      case FLOAT:
+        System.arraycopy(floats, start, subset.floats, 0, end - start);
+        break;
+      case DOUBLE:
+        System.arraycopy(doubles, start, subset.doubles, 0, end - start);
+        break;
+      case BINARY:
+        System.arraycopy(buffers, start, subset.buffers, 0, end - start);
+        break;
+      case STRING:
+        System.arraycopy(strings, start, subset.strings, 0, end - start);
+        break;
+    }
+    return subset;
+  }
+
   private boolean isNull(int index) {
     if (nulls == null) {
       return false;
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
new file mode 100644
index 0000000..9654b02
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class FetchCatalogResultJob implements Job<List<Object[]>> {
+  private final String sessionId;
+  private final String jobId;
+  private final int maxRows;
+
+  public FetchCatalogResultJob(String sessionId, String jobId, int maxRows) {
+    this.sessionId = sessionId;
+    this.jobId = jobId;
+    this.maxRows = maxRows;
+  }
+
+  @Override
+  public List<Object[]> call(JobContext jc) throws Exception {
+    ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
+    Iterator<Object[]> iterator = session.findCatalogJob(jobId).iter;
+
+    List<Object[]> result = new ArrayList<>();
+    int n = 0;
+    while (iterator.hasNext() && n < maxRows) {
+      result.add(iterator.next());
+      n += 1;
+    }
+    return result;
+  }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
new file mode 100644
index 0000000..bc2bd73
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.types.StructField;
+
+public class GetColumnsJob extends SparkCatalogJob {
+  private final String databasePattern;
+  private final String tablePattern;
+  private final String columnPattern;
+
+  public GetColumnsJob(
+    String databasePattern,
+    String tablePattern,
+    String columnPattern,
+    String sessionId,
+    String jobId) {
+      super(sessionId, jobId);
+      this.databasePattern = databasePattern;
+      this.tablePattern = tablePattern;
+      this.columnPattern = columnPattern;
+  }
+
+  @Override
+  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+    List<Object[]> columnList = new ArrayList<Object[]>();
+    List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern));
+
+    for (String db : databases) {
+      List<TableIdentifier> tableIdentifiers =
+        seqAsJavaList(catalog.listTables(db, tablePattern));
+      for (TableIdentifier tableIdentifier : tableIdentifiers) {
+        CatalogTable table = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+        List<StructField> fields = seqAsJavaList(table.schema());
+        int position = 0;
+        for (StructField field : fields) {
+          if (columnPattern == null || field.name().matches(columnPattern)) {
+            columnList.add(new Object[] {
+              DEFAULT_HIVE_CATALOG,
+              table.database(),
+              table.identifier().table(),
+              field.name(),
+              SparkUtils.toJavaSQLType(field.dataType()), // datatype
+              field.dataType().typeName(),
+              SparkUtils.getColumnSize(field.dataType()), //columnsize,
+              null, // BUFFER_LENGTH, unused,
+              SparkUtils.getDecimalDigits(field.dataType()),
+              SparkUtils.getNumPrecRadix(field.dataType()),
+              field.nullable() ? 1 : 0,
+              field.getComment().isDefined() ? field.getComment().get() : "",
+              null, // COLUMN_DEF
+              null, // SQL_DATA_TYPE
+              null, // SQL_DATETIME_SUB
+              null, // CHAR_OCTET_LENGTH
+              position,
+              field.nullable() ? "YES" : "NO", // IS_NULLABLE
+              null, // SCOPE_CATALOG
+              null, // SCOPE_SCHEMA
+              null, // SCOPE_TABLE
+              null, // SOURCE_DATA_TYPE
+              "NO" // IS_AUTO_INCREMENT
+            });
+            position += 1;
+          }
+        }
+      }
+    }
+    return columnList;
+  }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
new file mode 100644
index 0000000..297fb80
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo;
+
+public class GetFunctionsJob extends SparkCatalogJob {
+  private final String databasePattern;
+  private final String functionName;
+
+  public GetFunctionsJob(
+      String databasePattern,
+      String functionName,
+      String sessionId,
+      String jobId) {
+    super(sessionId, jobId);
+    this.databasePattern = databasePattern;
+    this.functionName = functionName;
+  }
+
+  @Override
+  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+    List<Object[]> funcList = new ArrayList<Object[]>();
+
+    List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern));
+    for (String db : databases) {
+      List<Tuple2<FunctionIdentifier, String>> identifiersTypes =
+        seqAsJavaList(catalog.listFunctions(db, functionName));
+      for (Tuple2<FunctionIdentifier, String> identifierType : identifiersTypes) {
+        FunctionIdentifier function = identifierType._1;
+        ExpressionInfo info = catalog.lookupFunctionInfo(function);
+        funcList.add(new Object[] {
+          null,
+          function.database().isDefined() ? function.database().get() : null,
+          function.funcName(),
+          info.getUsage() + info.getExtended(),
+          null,
+          info.getClassName()
+        });
+      }
+    }
+    return funcList;
+  }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
new file mode 100644
index 0000000..451bea2
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+
+public class GetSchemasJob extends SparkCatalogJob {
+  private final String schemaName;
+
+  public GetSchemasJob(String schemaName, String sessionId, String jobId) {
+    super(sessionId, jobId);
+    this.schemaName = schemaName;
+  }
+
+  @Override
+  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+    List<String> databases = seqAsJavaList(catalog.listDatabases(schemaName));
+    List<Object[]> schemas = new ArrayList<>();
+    for (String db : databases) {
+      schemas.add(new Object[] {
+        db,
+        DEFAULT_HIVE_CATALOG,
+      });
+    }
+    return schemas;
+  }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
new file mode 100644
index 0000000..a071aef
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+
+public class GetTablesJob extends SparkCatalogJob {
+  private final String databasePattern;
+  private final String tablePattern;
+  private final List<String> tableTypes = new ArrayList<String>();
+
+  public GetTablesJob(
+      String databasePattern,
+      String tablePattern,
+      List<String> tableTypes,
+      String sessionId,
+      String jobId) {
+    super(sessionId, jobId);
+    this.databasePattern = databasePattern;
+    this.tablePattern = tablePattern;
+    if (tableTypes != null) {
+      for (String type : tableTypes) {
+        this.tableTypes.add(type.toUpperCase());
+      }
+    }
+  }
+
+  @Override
+  protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+    List<Object[]> tableList = new ArrayList<Object[]>();
+    List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern));
+    for (String db : databases) {
+      List<TableIdentifier> tableIdentifiers =
+        seqAsJavaList(catalog.listTables(db, tablePattern));
+      for (TableIdentifier tableIdentifier : tableIdentifiers) {
+        CatalogTable table = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+        String type = convertTableType(table.tableType().name());
+        if (tableTypes.isEmpty() || tableTypes.contains(type)) {
+          tableList.add(
+            new Object[] {
+              DEFAULT_HIVE_CATALOG,
+              table.database(),
+              table.identifier().table(),
+              type,
+              table.comment().isDefined() ? table.comment().get() : ""
+            });
+        }
+      }
+    }
+    return tableList;
+  }
+
+  private String convertTableType(String originalType) {
+    if (originalType.equals(CatalogTableType.MANAGED().name())) {
+      return ClassicTableTypes.TABLE.name();
+    } else if (originalType.equals(CatalogTableType.EXTERNAL().name())) {
+      return ClassicTableTypes.TABLE.name();
+    } else if (originalType.equals(CatalogTableType.VIEW().name())) {
+      return ClassicTableTypes.VIEW.name();
+    } else {
+      throw new IllegalArgumentException("Invalid spark table type " + originalType);
+    }
+  }
+
+  private enum ClassicTableTypes {
+    TABLE,
+    VIEW,
+  }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
new file mode 100644
index 0000000..e86721e
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.List;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public abstract class SparkCatalogJob implements Job<Void> {
+  protected static final String DEFAULT_HIVE_CATALOG = "";
+
+  private final String sessionId;
+  private final String jobId;
+
+  public SparkCatalogJob(String sessionId, String jobId) {
+    this.sessionId = sessionId;
+    this.jobId = jobId;
+  }
+
+  protected abstract List<Object[]> fetchCatalogObjects(SessionCatalog catalog);
+
+  @Override
+  public Void call(JobContext jc) throws Exception {
+    SessionCatalog catalog = ((SparkSession)jc.sparkSession()).sessionState().catalog();
+    List<Object[]> objects = fetchCatalogObjects(catalog);
+
+    ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
+    session.registerCatalogJob(jobId, objects.iterator());
+    return null;
+  }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
index 99eab4d..fac79ad 100644
--- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.livy.thriftserver.session;
 
+import java.sql.Types;
+
 import org.apache.spark.sql.types.*;
 
 /**
@@ -57,4 +59,115 @@ final class SparkUtils {
     return types;
   }
 
+  /**
+   * This method is ported from Spark Hive Thrift server Type class
+   * @param type
+   * @return
+   */
+  public static int toJavaSQLType(org.apache.spark.sql.types.DataType type) {
+    if (type instanceof NullType) {
+      return Types.NULL;
+    } else if (type instanceof BooleanType) {
+      return Types.BOOLEAN;
+    } else if (type instanceof ByteType) {
+      return Types.TINYINT;
+    } else if (type instanceof ShortType) {
+      return Types.SMALLINT;
+    } else if (type instanceof IntegerType) {
+      return Types.INTEGER;
+    } else if (type instanceof LongType) {
+      return Types.BIGINT;
+    } else if (type instanceof FloatType) {
+      return Types.FLOAT;
+    } else if (type instanceof DoubleType) {
+      return Types.DOUBLE;
+    } else if (type instanceof StringType) {
+      return Types.VARCHAR;
+    } else if (type instanceof DecimalType) {
+      return Types.DECIMAL;
+    } else if (type instanceof DateType) {
+      return Types.DATE;
+    } else if (type instanceof TimestampType) {
+      return Types.TIMESTAMP;
+    } else if (type instanceof BinaryType) {
+      return Types.BINARY;
+    } else if (type instanceof ArrayType) {
+      return Types.ARRAY;
+    } else if (type instanceof MapType) {
+      return Types.JAVA_OBJECT;
+    } else if (type instanceof StructType) {
+      return Types.STRUCT;
+    } else {
+      return Types.OTHER;
+    }
+  }
+
+  /**
+   * This method is ported from Spark hive Thrift server TypeDescriptor
+   * @param type
+   * @return
+   */
+  public static Integer getColumnSize(org.apache.spark.sql.types.DataType type) {
+    if (type instanceof ByteType) {
+      return 3;
+    } else if (type instanceof ShortType) {
+      return 5;
+    } else if (type instanceof IntegerType) {
+      return 10;
+    } else if (type instanceof LongType) {
+      return 19;
+    } else if (type instanceof FloatType) {
+      return 7;
+    } else if (type instanceof DoubleType) {
+      return 15;
+    } else if (type instanceof DecimalType) {
+      return ((DecimalType)type).precision();
+    } else if (type instanceof StringType || type instanceof BinaryType || type instanceof MapType
+        || type instanceof ArrayType || type instanceof StructType) {
+      return Integer.MAX_VALUE;
+    } else if (type instanceof DateType) {
+      return 10;
+    } else if (type instanceof TimestampType) {
+      return 29;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * This method is ported from Spark hive Thrift server TypeDescriptor
+   * @param type
+   * @return
+   */
+  public static Integer getDecimalDigits(org.apache.spark.sql.types.DataType type) {
+    if (type instanceof BooleanType || type instanceof ByteType || type instanceof ShortType
+        || type instanceof IntegerType || type instanceof LongType) {
+      return 0;
+    } else if (type instanceof FloatType) {
+      return 7;
+    } else if (type instanceof DoubleType) {
+      return 15;
+    } else if (type instanceof DecimalType) {
+      return ((DecimalType)type).scale();
+    } else if (type instanceof TimestampType) {
+      return 9;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * This method is ported from Spark Hive Thrift server Type class
+   * @param type
+   * @return
+   */
+  public static Integer getNumPrecRadix(org.apache.spark.sql.types.DataType type) {
+    if (type instanceof ByteType || type instanceof ShortType || type instanceof IntegerType
+        || type instanceof LongType || type instanceof FloatType || type instanceof DoubleType
+        || type instanceof DecimalType) {
+      return 10;
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
index 1d71259..5378270 100644
--- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
@@ -77,6 +77,7 @@ class ThriftSessionState {
     this.sessionId = sessionId;
     this.statements = new ConcurrentHashMap<>();
     this.spark = ctx.<SparkSession>sparkSession().newSession();
+    this.catalogJobStates = new ConcurrentHashMap<>();
   }
 
   SparkSession spark() {
@@ -126,4 +127,35 @@ class ThriftSessionState {
     }
   }
 
+
+  private final ConcurrentMap<String, CatalogJobState> catalogJobStates;
+
+  void registerCatalogJob(String jobId, Iterator<Object[]> results) {
+    checkNotNull(jobId, "No catalog job ID.");
+    CatalogJobState state = new CatalogJobState(results);
+    if (catalogJobStates.putIfAbsent(jobId, state) != null) {
+      throw new IllegalStateException(
+          String.format("Catalog job %s already registered.", jobId));
+    }
+  }
+
+  CatalogJobState findCatalogJob(String jobId) {
+    checkNotNull(jobId, "No catalog job ID.");
+    CatalogJobState state = catalogJobStates.get(jobId);
+    if (state == null) {
+      throw catalogJobNotFound(jobId);
+    }
+    return state;
+  }
+
+  boolean cleanupCatalogJob(String jobId) {
+    checkNotNull(jobId, "No catalog job ID.");
+    return catalogJobStates.remove(jobId) != null;
+  }
+
+  private NoSuchElementException catalogJobNotFound(String jobId) {
+    return new NoSuchElementException(
+        String.format("Catalog job %s not found in session %s.", jobId, sessionId));
+  }
+
 }
diff --git a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
index 4dd30a2..addc630 100644
--- a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
+++ b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
@@ -19,6 +19,8 @@ package org.apache.livy.thriftserver.session;
 
 import java.net.URI;
 import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -124,9 +126,58 @@ public class ThriftSessionTest {
     assertTrue(cols[0].getNulls().get(0));
 
     assertTrue(waitFor(new CleanupStatementJob(s1, st4)));
-
     // Tear down the session.
     waitFor(new UnregisterSessionJob(s1));
+
+    String s3 = nextSession();
+    waitFor(new RegisterSessionJob(s3));
+    String getSchemaJobId = "test_get_schema_job";
+    waitFor(new GetSchemasJob("default", s3, getSchemaJobId));
+    List<Object[]> schemas = waitFor(
+        new FetchCatalogResultJob(s3, getSchemaJobId, Integer.MAX_VALUE));
+    assertEquals(1, schemas.size());
+    assertEquals("default", schemas.get(0)[0]);
+    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getSchemaJobId)));
+
+    String getTablesJobId = "test_get_tables_job";
+    List<String> testTableTypes = new ArrayList<>();
+    testTableTypes.add("Table");
+    waitFor(new GetTablesJob("default", "*",
+        testTableTypes, s3, getTablesJobId));
+    List<Object[]> tables = waitFor(
+        new FetchCatalogResultJob(s3, getTablesJobId, Integer.MAX_VALUE));
+    assertEquals(1, tables.size());
+    assertEquals("default", tables.get(0)[1]);
+    assertEquals("test", tables.get(0)[2]);
+    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getTablesJobId)));
+
+    String getColumnsJobId = "test_get_columns_job";
+    waitFor(new GetColumnsJob("default", "test", ".*", s3, getColumnsJobId));
+    List<Object[]> columns = waitFor(
+        new FetchCatalogResultJob(s3, getColumnsJobId, Integer.MAX_VALUE));
+    assertEquals(2, columns.size());
+    assertEquals("default", columns.get(0)[1]);
+    assertEquals("test", columns.get(0)[2]);
+    assertEquals("id", columns.get(0)[3]);
+    assertEquals("integer", columns.get(0)[5]);
+    assertEquals("default", columns.get(1)[1]);
+    assertEquals("test", columns.get(1)[2]);
+    assertEquals("desc", columns.get(1)[3]);
+    assertEquals("string", columns.get(1)[5]);
+    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getColumnsJobId)));
+
+    String getFunctionsJobId = "test_get_functions_job";
+    waitFor(new GetFunctionsJob("default", "unix_timestamp", s3, getFunctionsJobId));
+    List<Object[]> functions = waitFor(
+        new FetchCatalogResultJob(s3, getFunctionsJobId, Integer.MAX_VALUE));
+    assertEquals(1, functions.size());
+    assertNull(functions.get(0)[1]);
+    assertEquals("unix_timestamp", functions.get(0)[2]);
+    assertEquals("org.apache.spark.sql.catalyst.expressions.UnixTimestamp", functions.get(0)[5]);
+    assertTrue(waitFor(new CleanupCatalogResultJob(s3, getFunctionsJobId)));
+
+    // Tear down the session.
+    waitFor(new UnregisterSessionJob(s3));
   }
 
   private String nextSession() {