You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2019/08/16 02:32:30 UTC
[incubator-livy] branch master updated:
[LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions,
GetSchemas, GetTables, GetColumns in Livy thrift server
This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new cae9d97 [LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server
cae9d97 is described below
commit cae9d97185bf371912dcd863dff5babfd9cb704a
Author: yihengwang <yi...@tencent.com>
AuthorDate: Fri Aug 16 10:32:11 2019 +0800
[LIVY-622][LIVY-623][LIVY-624][LIVY-625][THRIFT] Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server
## What changes were proposed in this pull request?
In this patch, we add the implementations of GetSchemas, GetFunctions, GetTables, and GetColumns in Livy Thrift server.
https://issues.apache.org/jira/browse/LIVY-622
https://issues.apache.org/jira/browse/LIVY-623
https://issues.apache.org/jira/browse/LIVY-624
https://issues.apache.org/jira/browse/LIVY-625
## How was this patch tested?
Add new unit tests and integration test. Run them with existing tests.
Author: yihengwang <yi...@tencent.com>
Closes #194 from yiheng/fix_575.
---
.../apache/livy/thriftserver/LivyCLIService.scala | 16 +--
.../livy/thriftserver/LivyOperationManager.scala | 63 ++++++++
.../livy/thriftserver/cli/ThriftCLIService.scala | 17 ++-
.../operation/GetColumnsOperation.scala | 102 +++++++++++++
.../operation/GetFunctionsOperation.scala | 94 ++++++++++++
.../operation/GetSchemasOperation.scala | 63 ++++++++
.../operation/GetTablesOperation.scala | 73 ++++++++++
.../thriftserver/operation/MetadataOperation.scala | 6 +
.../operation/SparkCatalogOperation.scala | 119 ++++++++++++++++
.../livy/thriftserver/ThriftServerSuites.scala | 158 ++++++++++++++++++++-
.../livy/thriftserver/session/CatalogJobState.java | 28 ++++
.../session/CleanupCatalogResultJob.java | 37 +++++
.../livy/thriftserver/session/ColumnBuffer.java | 36 +++++
.../session/FetchCatalogResultJob.java | 51 +++++++
.../livy/thriftserver/session/GetColumnsJob.java | 93 ++++++++++++
.../livy/thriftserver/session/GetFunctionsJob.java | 67 +++++++++
.../livy/thriftserver/session/GetSchemasJob.java | 47 ++++++
.../livy/thriftserver/session/GetTablesJob.java | 92 ++++++++++++
.../livy/thriftserver/session/SparkCatalogJob.java | 50 +++++++
.../livy/thriftserver/session/SparkUtils.java | 113 +++++++++++++++
.../thriftserver/session/ThriftSessionState.java | 32 +++++
.../thriftserver/session/ThriftSessionTest.java | 53 ++++++-
22 files changed, 1395 insertions(+), 15 deletions(-)
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
index 725bdc8..3c84b4a 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
@@ -215,8 +215,8 @@ class LivyCLIService(server: LivyThriftServer)
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String): OperationHandle = {
- // TODO
- throw new HiveSQLException("Operation GET_SCHEMAS is not yet supported")
+ sessionManager.operationManager.getSchemas(
+ sessionHandle, catalogName, schemaName)
}
@throws[HiveSQLException]
@@ -226,8 +226,8 @@ class LivyCLIService(server: LivyThriftServer)
schemaName: String,
tableName: String,
tableTypes: util.List[String]): OperationHandle = {
- // TODO
- throw new HiveSQLException("Operation GET_TABLES is not yet supported")
+ sessionManager.operationManager.getTables(
+ sessionHandle, catalogName, schemaName, tableName, tableTypes)
}
@throws[HiveSQLException]
@@ -243,8 +243,8 @@ class LivyCLIService(server: LivyThriftServer)
schemaName: String,
tableName: String,
columnName: String): OperationHandle = {
- // TODO
- throw new HiveSQLException("Operation GET_COLUMNS is not yet supported")
+ sessionManager.operationManager.getColumns(
+ sessionHandle, catalogName, schemaName, tableName, columnName)
}
@throws[HiveSQLException]
@@ -253,8 +253,8 @@ class LivyCLIService(server: LivyThriftServer)
catalogName: String,
schemaName: String,
functionName: String): OperationHandle = {
- // TODO
- throw new HiveSQLException("Operation GET_FUNCTIONS is not yet supported")
+ sessionManager.operationManager.getFunctions(
+ sessionHandle, catalogName, schemaName, functionName)
}
@throws[HiveSQLException]
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
index e6d48ff..2454185 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
@@ -187,6 +187,69 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage
})
}
+ @throws[HiveSQLException]
+ def getTables(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: util.List[String]): OperationHandle = {
+ executeOperation(sessionHandle, {
+ val op = new GetTablesOperation(
+ sessionHandle,
+ catalogName,
+ schemaName,
+ tableName,
+ tableTypes,
+ livyThriftSessionManager)
+ addOperation(op, sessionHandle)
+ op
+ })
+ }
+
+ @throws[HiveSQLException]
+ def getFunctions(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ functionName: String): OperationHandle = {
+ executeOperation(sessionHandle, {
+ val op = new GetFunctionsOperation(sessionHandle, catalogName, schemaName, functionName,
+ livyThriftSessionManager)
+ addOperation(op, sessionHandle)
+ op
+ })
+ }
+
+ @throws[HiveSQLException]
+ def getSchemas(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String): OperationHandle = {
+ executeOperation(sessionHandle, {
+ val op = new GetSchemasOperation(sessionHandle, catalogName, schemaName,
+ livyThriftSessionManager)
+ addOperation(op, sessionHandle)
+ op
+ })
+ }
+
+ @throws[HiveSQLException]
+ def getColumns(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String): OperationHandle = {
+ executeOperation(sessionHandle, {
+ val op = new GetColumnsOperation(sessionHandle, catalogName, schemaName, tableName,
+ columnName, livyThriftSessionManager)
+ addOperation(op, sessionHandle)
+ op
+ })
+ }
+
+
/**
* Cancel the running operation unless it is already in a terminal state
*/
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala
index 4a3276f..da108ab 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala
@@ -427,8 +427,8 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = {
val resp = new TGetSchemasResp
try {
- val opHandle = cliService.getSchemas(
- new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName)
+ val opHandle = cliService.getSchemas(createSessionHandle(req.getSessionHandle),
+ req.getCatalogName, req.getSchemaName)
resp.setOperationHandle(opHandle.toTOperationHandle)
resp.setStatus(ThriftCLIService.OK_STATUS)
} catch {
@@ -444,7 +444,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
val resp = new TGetTablesResp
try {
val opHandle = cliService.getTables(
- new SessionHandle(req.getSessionHandle),
+ createSessionHandle(req.getSessionHandle),
req.getCatalogName,
req.getSchemaName,
req.getTableName,
@@ -479,7 +479,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
val resp = new TGetColumnsResp
try {
val opHandle = cliService.getColumns(
- new SessionHandle(req.getSessionHandle),
+ createSessionHandle(req.getSessionHandle),
req.getCatalogName,
req.getSchemaName,
req.getTableName,
@@ -499,7 +499,7 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
val resp = new TGetFunctionsResp
try {
val opHandle = cliService.getFunctions(
- new SessionHandle(req.getSessionHandle),
+ createSessionHandle(req.getSessionHandle),
req.getCatalogName,
req.getSchemaName,
req.getFunctionName)
@@ -728,6 +728,13 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName:
s"Failed to validate proxy privilege of $realUser for $proxyUser", "08S01", e)
}
}
+
+ private def createSessionHandle(tHandle: TSessionHandle): SessionHandle = {
+ val protocolVersion = cliService.getSessionManager
+ .getSessionInfo(new SessionHandle(tHandle))
+ .protocolVersion
+ new SessionHandle(tHandle, protocolVersion)
+ }
}
object ThriftCLIService {
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
new file mode 100644
index 0000000..c9c106c
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.session.{GetColumnsJob, GetFunctionsJob}
+
+class GetColumnsOperation(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String,
+ sessionManager: LivyThriftSessionManager)
+ extends SparkCatalogOperation(
+ sessionHandle, OperationType.GET_COLUMNS, sessionManager) with Logging {
+
+ @throws(classOf[HiveSQLException])
+ override protected def runInternal(): Unit = {
+ setState(OperationState.RUNNING)
+ try {
+ rscClient.submit(new GetColumnsJob(
+ convertSchemaPattern(schemaName),
+ convertIdentifierPattern(tableName, datanucleusFormat = true),
+ Option(columnName).map { convertIdentifierPattern(_, datanucleusFormat = false) }.orNull,
+ sessionId,
+ jobId
+ )).get()
+
+ setState(OperationState.FINISHED)
+ } catch {
+ case e: Throwable =>
+ error("Remote job meet an exception: ", e)
+ setState(OperationState.ERROR)
+ throw new HiveSQLException(e)
+ }
+ }
+
+ @throws(classOf[HiveSQLException])
+ override def getResultSetSchema: Schema = {
+ assertState(Seq(OperationState.FINISHED))
+ GetColumnsOperation.SCHEMA
+ }
+}
+
+object GetColumnsOperation {
+ val SCHEMA = Schema(
+ Field("TABLE_CAT", BasicDataType("string"), "Catalog name. NULL if not applicable."),
+ Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
+ Field("TABLE_NAME", BasicDataType("string"), "Table name."),
+ Field("COLUMN_NAME", BasicDataType("string"), "Column name"),
+ Field("DATA_TYPE", BasicDataType("integer"), "SQL type from java.sql.Types"),
+ Field("TYPE_NAME", BasicDataType("string"),
+ "Data source dependent type name, for a UDT the type name is fully qualified"),
+ Field("COLUMN_SIZE", BasicDataType("integer"), "Column size. For char or date types this is " +
+ "the maximum number of characters, for numeric or decimal types this is precision."),
+ Field("BUFFER_LENGTH", BasicDataType("byte"), "Unused"),
+ Field("DECIMAL_DIGITS", BasicDataType("integer"), "The number of fractional digits"),
+ Field("NUM_PREC_RADIX", BasicDataType("integer"), "Radix (typically either 10 or 2)"),
+ Field("NULLABLE", BasicDataType("integer"), "Is NULL allowed"),
+ Field("REMARKS", BasicDataType("string"), "Comment describing column (may be null)"),
+ Field("COLUMN_DEF", BasicDataType("string"), "Default value (may be null)"),
+ Field("SQL_DATA_TYPE", BasicDataType("integer"), "Unused"),
+ Field("SQL_DATETIME_SUB", BasicDataType("integer"), "Unused"),
+ Field("CHAR_OCTET_LENGTH", BasicDataType("integer"), "For char types the maximum number of " +
+ "bytes in the column"),
+ Field("ORDINAL_POSITION", BasicDataType("integer"), "Index of column in table (starting at 1)"),
+ Field("IS_NULLABLE", BasicDataType("string"), "\"NO\" means column definitely does not " +
+ "allow NULL values; \"YES\" means the column might allow NULL values. An empty string " +
+ "means nobody knows."),
+ Field("SCOPE_CATALOG", BasicDataType("string"), "Catalog of table that is the scope of a " +
+ "reference attribute (null if DATA_TYPE isn't REF)"),
+ Field("SCOPE_SCHEMA", BasicDataType("string"), "Schema of table that is the scope of a " +
+ "reference attribute (null if the DATA_TYPE isn't REF)"),
+ Field("SCOPE_TABLE", BasicDataType("string"), "Table name that this the scope of a " +
+ "reference attribure (null if the DATA_TYPE isn't REF)"),
+ Field("SOURCE_DATA_TYPE", BasicDataType("short"), "Source type of a distinct type or " +
+ "user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't " +
+ "DISTINCT or user-generated REF)"),
+ Field("IS_AUTO_INCREMENT", BasicDataType("string"), "Indicates whether this column is " +
+ "auto incremented.")
+ )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
new file mode 100644
index 0000000..0e43f16
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetFunctionsOperation.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.session.GetFunctionsJob
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+
+class GetFunctionsOperation(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ functionName: String,
+ sessionManager: LivyThriftSessionManager)
+ extends SparkCatalogOperation(
+ sessionHandle, OperationType.GET_FUNCTIONS, sessionManager) with Logging {
+
+ @throws(classOf[HiveSQLException])
+ override protected def runInternal(): Unit = {
+ setState(OperationState.RUNNING)
+ try {
+ rscClient.submit(new GetFunctionsJob(
+ convertSchemaPattern(schemaName),
+ convertFunctionName(functionName),
+ sessionId,
+ jobId
+ )).get()
+
+ setState(OperationState.FINISHED)
+ } catch {
+ case e: Throwable =>
+ error("Remote job meet an exception: ", e)
+ setState(OperationState.ERROR)
+ throw new HiveSQLException(e)
+ }
+ }
+
+ @throws(classOf[HiveSQLException])
+ override def getResultSetSchema: Schema = {
+ assertState(Seq(OperationState.FINISHED))
+ GetFunctionsOperation.SCHEMA
+ }
+
+ private def convertFunctionName(name: String): String = {
+ if (name == null) {
+ ".*"
+ } else {
+ var escape = false
+ name.flatMap {
+ case c if escape =>
+ if (c != '\\') escape = false
+ c.toString
+ case '\\' =>
+ escape = true
+ ""
+ case '%' => ".*"
+ case '_' => "."
+ case c => Character.toLowerCase(c).toString
+ }
+ }
+ }
+}
+
+object GetFunctionsOperation {
+ val SCHEMA = Schema(
+ Field("FUNCTION_CAT", BasicDataType("string"), "Function catalog (may be null)"),
+ Field("FUNCTION_SCHEM", BasicDataType("string"), "Function schema (may be null)"),
+ Field("FUNCTION_NAME", BasicDataType("string"),
+ "Function name. This is the name used to invoke the function"),
+ Field("REMARKS", BasicDataType("string"), "Explanatory comment on the function"),
+ Field("FUNCTION_TYPE", BasicDataType("integer"),
+ "Kind of function."),
+ Field("SPECIFIC_NAME", BasicDataType("string"),
+ "The name which uniquely identifies this function within its schema")
+ )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
new file mode 100644
index 0000000..6bd0a17
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetSchemasOperation.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli._
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.session.{GetSchemasJob, GetTablesJob}
+
+class GetSchemasOperation(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ sessionManager: LivyThriftSessionManager)
+ extends SparkCatalogOperation(
+ sessionHandle, OperationType.GET_SCHEMAS, sessionManager) with Logging {
+
+ @throws(classOf[HiveSQLException])
+ override protected def runInternal(): Unit = {
+ setState(OperationState.RUNNING)
+ try {
+ rscClient.submit(new GetSchemasJob(
+ convertSchemaPattern(schemaName), sessionId, jobId
+ )).get()
+ setState(OperationState.FINISHED)
+ } catch {
+ case e: Throwable =>
+ error("Remote job meet an exception: ", e)
+ setState(OperationState.ERROR)
+ throw new HiveSQLException(e)
+ }
+ }
+
+ @throws(classOf[HiveSQLException])
+ override def getResultSetSchema: Schema = {
+ assertState(Seq(OperationState.FINISHED))
+ GetSchemasOperation.SCHEMA
+ }
+}
+
+object GetSchemasOperation {
+ val SCHEMA = Schema(
+ Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
+ Field("TABLE_CATALOG", BasicDataType("string"), "Catalog name.")
+ )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
new file mode 100644
index 0000000..4a939b3
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTablesOperation.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
+
+import org.apache.livy.Logging
+import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.session.GetTablesJob
+
+class GetTablesOperation(
+ sessionHandle: SessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: java.util.List[String],
+ sessionManager: LivyThriftSessionManager)
+ extends SparkCatalogOperation(
+ sessionHandle, OperationType.GET_TABLES, sessionManager) with Logging {
+
+ @throws(classOf[HiveSQLException])
+ override protected def runInternal(): Unit = {
+ setState(OperationState.RUNNING)
+ try {
+ rscClient.submit(new GetTablesJob(
+ convertSchemaPattern(schemaName),
+ convertIdentifierPattern(tableName, datanucleusFormat = true),
+ tableTypes,
+ sessionId,
+ jobId
+ )).get()
+
+ setState(OperationState.FINISHED)
+ } catch {
+ case e: Throwable =>
+ error("Remote job meet an exception: ", e)
+ setState(OperationState.ERROR)
+ throw new HiveSQLException(e)
+ }
+ }
+
+ @throws(classOf[HiveSQLException])
+ override def getResultSetSchema: Schema = {
+ assertState(Seq(OperationState.FINISHED))
+ GetTablesOperation.SCHEMA
+ }
+}
+
+object GetTablesOperation {
+ val SCHEMA = Schema(
+ Field("TABLE_CAT", BasicDataType("string"), "Catalog name. NULL if not applicable."),
+ Field("TABLE_SCHEM", BasicDataType("string"), "Schema name."),
+ Field("TABLE_NAME", BasicDataType("string"), "Table name."),
+ Field("TABLE_TYPE", BasicDataType("string"), "The table type, e.g. \"TABLE\", \"VIEW\", etc."),
+ Field("REMARKS", BasicDataType("string"), "Comments about the table.")
+ )
+}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
index 4db3929..4689f26 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/MetadataOperation.scala
@@ -21,6 +21,12 @@ import org.apache.hive.service.cli.{FetchOrientation, HiveSQLException, Operatio
import org.apache.livy.thriftserver.serde.ThriftResultSet
+/**
+ * MetadataOperation is the base class for operations which do not perform any call on Spark side
+ *
+ * @param sessionHandle
+ * @param opType
+ */
abstract class MetadataOperation(sessionHandle: SessionHandle, opType: OperationType)
extends Operation(sessionHandle, opType) {
setHasResultSet(true)
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
new file mode 100644
index 0000000..9ed31f7
--- /dev/null
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.operation
+
+import org.apache.commons.lang.StringUtils
+import org.apache.hive.service.cli._
+
+import org.apache.livy.thriftserver.LivyThriftSessionManager
+import org.apache.livy.thriftserver.serde.ThriftResultSet
+import org.apache.livy.thriftserver.session.{CleanupCatalogResultJob, FetchCatalogResultJob}
+
+/**
+ * SparkCatalogOperation is the base class for operations which need to fetch catalog information
+ * from spark session.
+ */
+abstract class SparkCatalogOperation(
+ sessionHandle: SessionHandle,
+ opType: OperationType,
+ sessionManager: LivyThriftSessionManager)
+ extends Operation(sessionHandle, opType) {
+
+ // The initialization need to be lazy in order not to block when the instance is created
+ protected lazy val rscClient = {
+ // This call is blocking, we are waiting for the session to be ready.
+ sessionManager.getLivySession(sessionHandle).client.get
+ }
+
+ protected lazy val jobId = {
+ this.opHandle.getHandleIdentifier.getPublicId.toString + "-" +
+ this.opHandle.getHandleIdentifier.getSecretId.toString
+ }
+
+ protected lazy val sessionId = {
+ sessionHandle.getSessionId.toString
+ }
+
+ @throws[HiveSQLException]
+ override def close(): Unit = {
+ val cleaned = rscClient.submit(new CleanupCatalogResultJob(sessionId, jobId)).get()
+ if (!cleaned) {
+ warn(s"Fail to cleanup fetch catalog job (session = ${sessionId}), " +
+ "this message can be ignored if the job failed.")
+ }
+ setState(OperationState.CLOSED)
+ }
+
+ @throws[HiveSQLException]
+ override def cancel(stateAfterCancel: OperationState): Unit = {
+ setState(OperationState.CANCELED)
+ // Spark fetch schema is not a really spark job. It only run on driver and cannot be cancelled
+ throw new UnsupportedOperationException("SparkCatalogOperation.cancel()")
+ }
+
+ /**
+ * Convert wildchars and escape sequence from JDBC format to datanucleous/regex
+ *
+ * This is ported from Spark Hive Thrift MetaOperation.
+ */
+ protected def convertIdentifierPattern(pattern: String, datanucleusFormat: Boolean): String = {
+ if (pattern == null) {
+ convertPattern("%", datanucleusFormat = true)
+ } else {
+ convertPattern(pattern, datanucleusFormat)
+ }
+ }
+
+ /**
+ * Convert wildchars and escape sequence of schema pattern from JDBC format to datanucleous/regex
+ * The schema pattern treats empty string also as wildchar.
+ *
+ * This is ported from Spark Hive Thrift MetaOperation.
+ */
+ protected def convertSchemaPattern(pattern: String): String = {
+ if (StringUtils.isEmpty(pattern)) {
+ convertPattern("%", datanucleusFormat = true)
+ } else {
+ convertPattern(pattern, datanucleusFormat = true)
+ }
+ }
+
+ private def convertPattern(pattern: String, datanucleusFormat: Boolean): String = {
+ val wStr = if (datanucleusFormat) "*" else ".*"
+ pattern
+ .replaceAll("([^\\\\])%", "$1" + wStr)
+ .replaceAll("\\\\%", "%")
+ .replaceAll("^%", wStr)
+ .replaceAll("([^\\\\])_", "$1.")
+ .replaceAll("\\\\_", "_")
+ .replaceAll("^_", ".")
+ }
+
+ override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long): ThriftResultSet = {
+ validateFetchOrientation(orientation)
+ assertState(Seq(OperationState.FINISHED))
+ setHasResultSet(true)
+ val maxRows = maxRowsL.toInt
+ val results = rscClient.submit(new FetchCatalogResultJob(sessionId, jobId, maxRows)).get()
+
+ val rowSet = ThriftResultSet.apply(getResultSetSchema, protocolVersion)
+ import scala.collection.JavaConverters._
+ results.asScala.foreach { r => rowSet.addRow(r.asInstanceOf[Array[Any]]) }
+ return rowSet
+ }
+}
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
index a3d9e88..6411881 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -17,7 +17,7 @@
package org.apache.livy.thriftserver
-import java.sql.{Date, SQLException, Statement}
+import java.sql.{Connection, Date, SQLException, Statement}
import org.apache.livy.LivyConf
@@ -69,6 +69,114 @@ trait CommonThriftTests {
}
assert(!resultSetComplex.next())
}
+
+ def getSchemasTest(connection: Connection): Unit = {
+ val metadata = connection.getMetaData
+ val schemaResultSet = metadata.getSchemas()
+ assert(schemaResultSet.getMetaData.getColumnCount == 2)
+ assert(schemaResultSet.getMetaData.getColumnName(1) == "TABLE_SCHEM")
+ assert(schemaResultSet.getMetaData.getColumnName(2) == "TABLE_CATALOG")
+ schemaResultSet.next()
+ assert(schemaResultSet.getString(1) == "default")
+ assert(!schemaResultSet.next())
+ }
+
+ def getFunctionsTest(connection: Connection): Unit = {
+ val metadata = connection.getMetaData
+
+ val functionResultSet = metadata.getFunctions("", "default", "unix_timestamp")
+ assert(functionResultSet.getMetaData.getColumnCount == 6)
+ assert(functionResultSet.getMetaData.getColumnName(1) == "FUNCTION_CAT")
+ assert(functionResultSet.getMetaData.getColumnName(2) == "FUNCTION_SCHEM")
+ assert(functionResultSet.getMetaData.getColumnName(3) == "FUNCTION_NAME")
+ assert(functionResultSet.getMetaData.getColumnName(4) == "REMARKS")
+ assert(functionResultSet.getMetaData.getColumnName(5) == "FUNCTION_TYPE")
+ assert(functionResultSet.getMetaData.getColumnName(6) == "SPECIFIC_NAME")
+ functionResultSet.next()
+ assert(functionResultSet.getString(3) == "unix_timestamp")
+ assert(functionResultSet.getString(6) ==
+ "org.apache.spark.sql.catalyst.expressions.UnixTimestamp")
+ assert(!functionResultSet.next())
+ }
+
+ def getTablesTest(connection: Connection): Unit = {
+ val statement = connection.createStatement()
+ statement.execute("CREATE TABLE test_get_tables (id integer, desc string) USING json")
+ statement.close()
+
+ val metadata = connection.getMetaData
+ val tablesResultSet = metadata.getTables("", "default", "*", Array("TABLE"))
+ assert(tablesResultSet.getMetaData.getColumnCount == 5)
+ assert(tablesResultSet.getMetaData.getColumnName(1) == "TABLE_CAT")
+ assert(tablesResultSet.getMetaData.getColumnName(2) == "TABLE_SCHEM")
+ assert(tablesResultSet.getMetaData.getColumnName(3) == "TABLE_NAME")
+ assert(tablesResultSet.getMetaData.getColumnName(4) == "TABLE_TYPE")
+ assert(tablesResultSet.getMetaData.getColumnName(5) == "REMARKS")
+
+ tablesResultSet.next()
+ assert(tablesResultSet.getString(3) == "test_get_tables")
+ assert(tablesResultSet.getString(4) == "TABLE")
+ assert(!tablesResultSet.next())
+ }
+
+ def getColumnsTest(connection: Connection): Unit = {
+ val metadata = connection.getMetaData
+ val statement = connection.createStatement()
+ statement.execute("CREATE TABLE test_get_columns (id integer, desc string) USING json")
+ statement.close()
+
+ val columnsResultSet = metadata.getColumns("", "default", "test_get_columns", ".*")
+ assert(columnsResultSet.getMetaData.getColumnCount == 23)
+ columnsResultSet.next()
+ assert(columnsResultSet.getString(1) == "")
+ assert(columnsResultSet.getString(2) == "default")
+ assert(columnsResultSet.getString(3) == "test_get_columns")
+ assert(columnsResultSet.getString(4) == "id")
+ assert(columnsResultSet.getInt(5) == 4)
+ assert(columnsResultSet.getString(6) == "integer")
+ assert(columnsResultSet.getInt(7) == 10)
+ assert(columnsResultSet.getString(8) == null)
+ assert(columnsResultSet.getInt(9) == 0)
+ assert(columnsResultSet.getInt(10) == 10)
+ assert(columnsResultSet.getInt(11) == 1)
+ assert(columnsResultSet.getString(12) == "")
+ assert(columnsResultSet.getString(13) == null)
+ assert(columnsResultSet.getString(14) == null)
+ assert(columnsResultSet.getString(15) == null)
+ assert(columnsResultSet.getString(15) == null)
+ assert(columnsResultSet.getInt(17) == 0)
+ assert(columnsResultSet.getString(18) == "YES")
+ assert(columnsResultSet.getString(19) == null)
+ assert(columnsResultSet.getString(20) == null)
+ assert(columnsResultSet.getString(21) == null)
+ assert(columnsResultSet.getString(22) == null)
+ assert(columnsResultSet.getString(23) == "NO")
+ columnsResultSet.next()
+ assert(columnsResultSet.getString(1) == "")
+ assert(columnsResultSet.getString(2) == "default")
+ assert(columnsResultSet.getString(3) == "test_get_columns")
+ assert(columnsResultSet.getString(4) == "desc")
+ assert(columnsResultSet.getInt(5) == 12)
+ assert(columnsResultSet.getString(6) == "string")
+ assert(columnsResultSet.getInt(7) == Integer.MAX_VALUE)
+ assert(columnsResultSet.getString(8) == null)
+ assert(columnsResultSet.getString(9) == null)
+ assert(columnsResultSet.getString(10) == null)
+ assert(columnsResultSet.getInt(11) == 1)
+ assert(columnsResultSet.getString(12) == "")
+ assert(columnsResultSet.getString(13) == null)
+ assert(columnsResultSet.getString(14) == null)
+ assert(columnsResultSet.getString(15) == null)
+ assert(columnsResultSet.getString(16) == null)
+ assert(columnsResultSet.getInt(17) == 1)
+ assert(columnsResultSet.getString(18) == "YES")
+ assert(columnsResultSet.getString(19) == null)
+ assert(columnsResultSet.getString(20) == null)
+ assert(columnsResultSet.getString(21) == null)
+ assert(columnsResultSet.getString(22) == null)
+ assert(columnsResultSet.getString(23) == "NO")
+ assert(!columnsResultSet.next())
+ }
}
class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests {
@@ -163,6 +271,30 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTest
assert(caught.getMessage.contains("Table or view not found: `global_temp`.`invalid_table`"))
}
}
+
+ test("fetch schemas") {
+ withJdbcConnection { connection =>
+ getSchemasTest(connection)
+ }
+ }
+
+ test("fetch functions") {
+ withJdbcConnection { connection =>
+ getFunctionsTest(connection)
+ }
+ }
+
+ test("fetch tables") {
+ withJdbcConnection { connection =>
+ getTablesTest(connection)
+ }
+ }
+
+ test("fetch column") {
+ withJdbcConnection { connection =>
+ getColumnsTest(connection)
+ }
+ }
}
class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests {
@@ -175,4 +307,28 @@ class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests
dataTypesTest(statement, supportMap)
}
}
+
+ test("fetch schemas") {
+ withJdbcConnection { connection =>
+ getSchemasTest(connection)
+ }
+ }
+
+ test("fetch functions") {
+ withJdbcConnection { connection =>
+ getFunctionsTest(connection)
+ }
+ }
+
+ test("fetch tables") {
+ withJdbcConnection { connection =>
+ getTablesTest(connection)
+ }
+ }
+
+ test("fetch column") {
+ withJdbcConnection { connection =>
+ getColumnsTest(connection)
+ }
+ }
}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java
new file mode 100644
index 0000000..5571574
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CatalogJobState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.Iterator;
+
+public class CatalogJobState {
+ final Iterator<Object[]> iter;
+
+ public CatalogJobState(Iterator<Object[]> iter) {
+ this.iter = iter;
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java
new file mode 100644
index 0000000..b9444ca
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupCatalogResultJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class CleanupCatalogResultJob implements Job<Boolean> {
+ private final String sessionId;
+ private final String jobId;
+
+ public CleanupCatalogResultJob(String sessionId, String jobId) {
+ this.sessionId = sessionId;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public Boolean call(JobContext jc) throws Exception {
+ ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
+ return session.cleanupCatalogJob(jobId);
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
index 4408586..d4ec747 100644
--- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java
@@ -204,6 +204,42 @@ public class ColumnBuffer {
return nulls != null ? BitSet.valueOf(nulls) : new BitSet();
}
+ public ColumnBuffer extractSubset(int start, int end) {
+ ColumnBuffer subset = new ColumnBuffer(type);
+ subset.currentSize = end - start;
+ subset.ensureCapacity();
+ switch (type) {
+ case BOOLEAN:
+ System.arraycopy(bools, start, subset.bools, 0, end - start);
+ break;
+ case BYTE:
+ System.arraycopy(bytes, start, subset.bytes, 0, end - start);
+ break;
+ case SHORT:
+ System.arraycopy(shorts, start, subset.shorts, 0, end - start);
+ break;
+ case INTEGER:
+ System.arraycopy(ints, start, subset.ints, 0, end - start);
+ break;
+ case LONG:
+ System.arraycopy(longs, start, subset.longs, 0, end - start);
+ break;
+ case FLOAT:
+ System.arraycopy(floats, start, subset.floats, 0, end - start);
+ break;
+ case DOUBLE:
+ System.arraycopy(doubles, start, subset.doubles, 0, end - start);
+ break;
+ case BINARY:
+ System.arraycopy(buffers, start, subset.buffers, 0, end - start);
+ break;
+ case STRING:
+ System.arraycopy(strings, start, subset.strings, 0, end - start);
+ break;
+ }
+ return subset;
+ }
+
private boolean isNull(int index) {
if (nulls == null) {
return false;
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
new file mode 100644
index 0000000..9654b02
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public class FetchCatalogResultJob implements Job<List<Object[]>> {
+ private final String sessionId;
+ private final String jobId;
+ private final int maxRows;
+
+ public FetchCatalogResultJob(String sessionId, String jobId, int maxRows) {
+ this.sessionId = sessionId;
+ this.jobId = jobId;
+ this.maxRows = maxRows;
+ }
+
+ @Override
+ public List<Object[]> call(JobContext jc) throws Exception {
+ ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
+ Iterator<Object[]> iterator = session.findCatalogJob(jobId).iter;
+
+ List<Object[]> result = new ArrayList<>();
+ int n = 0;
+ while (iterator.hasNext() && n < maxRows) {
+ result.add(iterator.next());
+ n += 1;
+ }
+ return result;
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
new file mode 100644
index 0000000..bc2bd73
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.types.StructField;
+
+public class GetColumnsJob extends SparkCatalogJob {
+ private final String databasePattern;
+ private final String tablePattern;
+ private final String columnPattern;
+
+ public GetColumnsJob(
+ String databasePattern,
+ String tablePattern,
+ String columnPattern,
+ String sessionId,
+ String jobId) {
+ super(sessionId, jobId);
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.columnPattern = columnPattern;
+ }
+
+ @Override
+ protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+ List<Object[]> columnList = new ArrayList<Object[]>();
+ List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern));
+
+ for (String db : databases) {
+ List<TableIdentifier> tableIdentifiers =
+ seqAsJavaList(catalog.listTables(db, tablePattern));
+ for (TableIdentifier tableIdentifier : tableIdentifiers) {
+ CatalogTable table = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+ List<StructField> fields = seqAsJavaList(table.schema());
+ int position = 0;
+ for (StructField field : fields) {
+ if (columnPattern == null || field.name().matches(columnPattern)) {
+ columnList.add(new Object[] {
+ DEFAULT_HIVE_CATALOG,
+ table.database(),
+ table.identifier().table(),
+ field.name(),
+ SparkUtils.toJavaSQLType(field.dataType()), // datatype
+ field.dataType().typeName(),
+ SparkUtils.getColumnSize(field.dataType()), //columnsize,
+ null, // BUFFER_LENGTH, unused,
+ SparkUtils.getDecimalDigits(field.dataType()),
+ SparkUtils.getNumPrecRadix(field.dataType()),
+ field.nullable() ? 1 : 0,
+ field.getComment().isDefined() ? field.getComment().get() : "",
+ null, // COLUMN_DEF
+ null, // SQL_DATA_TYPE
+ null, // SQL_DATETIME_SUB
+ null, // CHAR_OCTET_LENGTH
+ position,
+ field.nullable() ? "YES" : "NO", // IS_NULLABLE
+ null, // SCOPE_CATALOG
+ null, // SCOPE_SCHEMA
+ null, // SCOPE_TABLE
+ null, // SOURCE_DATA_TYPE
+ "NO" // IS_AUTO_INCREMENT
+ });
+ position += 1;
+ }
+ }
+ }
+ }
+ return columnList;
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
new file mode 100644
index 0000000..297fb80
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetFunctionsJob.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo;
+
+public class GetFunctionsJob extends SparkCatalogJob {
+ private final String databasePattern;
+ private final String functionName;
+
+ public GetFunctionsJob(
+ String databasePattern,
+ String functionName,
+ String sessionId,
+ String jobId) {
+ super(sessionId, jobId);
+ this.databasePattern = databasePattern;
+ this.functionName = functionName;
+ }
+
+ @Override
+ protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+ List<Object[]> funcList = new ArrayList<Object[]>();
+
+ List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern));
+ for (String db : databases) {
+ List<Tuple2<FunctionIdentifier, String>> identifiersTypes =
+ seqAsJavaList(catalog.listFunctions(db, functionName));
+ for (Tuple2<FunctionIdentifier, String> identifierType : identifiersTypes) {
+ FunctionIdentifier function = identifierType._1;
+ ExpressionInfo info = catalog.lookupFunctionInfo(function);
+ funcList.add(new Object[] {
+ null,
+ function.database().isDefined() ? function.database().get() : null,
+ function.funcName(),
+ info.getUsage() + info.getExtended(),
+ null,
+ info.getClassName()
+ });
+ }
+ }
+ return funcList;
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
new file mode 100644
index 0000000..451bea2
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+
+public class GetSchemasJob extends SparkCatalogJob {
+ private final String schemaName;
+
+ public GetSchemasJob(String schemaName, String sessionId, String jobId) {
+ super(sessionId, jobId);
+ this.schemaName = schemaName;
+ }
+
+ @Override
+ protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+ List<String> databases = seqAsJavaList(catalog.listDatabases(schemaName));
+ List<Object[]> schemas = new ArrayList<>();
+ for (String db : databases) {
+ schemas.add(new Object[] {
+ db,
+ DEFAULT_HIVE_CATALOG,
+ });
+ }
+ return schemas;
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
new file mode 100644
index 0000000..a071aef
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static scala.collection.JavaConversions.seqAsJavaList;
+
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+
+public class GetTablesJob extends SparkCatalogJob {
+ private final String databasePattern;
+ private final String tablePattern;
+ private final List<String> tableTypes = new ArrayList<String>();
+
+ public GetTablesJob(
+ String databasePattern,
+ String tablePattern,
+ List<String> tableTypes,
+ String sessionId,
+ String jobId) {
+ super(sessionId, jobId);
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ if (tableTypes != null) {
+ for (String type : tableTypes) {
+ this.tableTypes.add(type.toUpperCase());
+ }
+ }
+ }
+
+ @Override
+ protected List<Object[]> fetchCatalogObjects(SessionCatalog catalog) {
+ List<Object[]> tableList = new ArrayList<Object[]>();
+ List<String> databases = seqAsJavaList(catalog.listDatabases(databasePattern));
+ for (String db : databases) {
+ List<TableIdentifier> tableIdentifiers =
+ seqAsJavaList(catalog.listTables(db, tablePattern));
+ for (TableIdentifier tableIdentifier : tableIdentifiers) {
+ CatalogTable table = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
+ String type = convertTableType(table.tableType().name());
+ if (tableTypes.isEmpty() || tableTypes.contains(type)) {
+ tableList.add(
+ new Object[] {
+ DEFAULT_HIVE_CATALOG,
+ table.database(),
+ table.identifier().table(),
+ type,
+ table.comment().isDefined() ? table.comment().get() : ""
+ });
+ }
+ }
+ }
+ return tableList;
+ }
+
+ private String convertTableType(String originalType) {
+ if (originalType.equals(CatalogTableType.MANAGED().name())) {
+ return ClassicTableTypes.TABLE.name();
+ } else if (originalType.equals(CatalogTableType.EXTERNAL().name())) {
+ return ClassicTableTypes.TABLE.name();
+ } else if (originalType.equals(CatalogTableType.VIEW().name())) {
+ return ClassicTableTypes.VIEW.name();
+ } else {
+ throw new IllegalArgumentException("Invalid spark table type " + originalType);
+ }
+ }
+
+ private enum ClassicTableTypes {
+ TABLE,
+ VIEW,
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
new file mode 100644
index 0000000..e86721e
--- /dev/null
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkCatalogJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.session;
+
+import java.util.List;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+
+import org.apache.livy.Job;
+import org.apache.livy.JobContext;
+
+public abstract class SparkCatalogJob implements Job<Void> {
+ protected static final String DEFAULT_HIVE_CATALOG = "";
+
+ private final String sessionId;
+ private final String jobId;
+
+ public SparkCatalogJob(String sessionId, String jobId) {
+ this.sessionId = sessionId;
+ this.jobId = jobId;
+ }
+
+ protected abstract List<Object[]> fetchCatalogObjects(SessionCatalog catalog);
+
+ @Override
+ public Void call(JobContext jc) throws Exception {
+ SessionCatalog catalog = ((SparkSession)jc.sparkSession()).sessionState().catalog();
+ List<Object[]> objects = fetchCatalogObjects(catalog);
+
+ ThriftSessionState session = ThriftSessionState.get(jc, sessionId);
+ session.registerCatalogJob(jobId, objects.iterator());
+ return null;
+ }
+}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
index 99eab4d..fac79ad 100644
--- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/SparkUtils.java
@@ -17,6 +17,8 @@
package org.apache.livy.thriftserver.session;
+import java.sql.Types;
+
import org.apache.spark.sql.types.*;
/**
@@ -57,4 +59,115 @@ final class SparkUtils {
return types;
}
+ /**
+ * This method is ported from Spark Hive Thrift server Type class
+ * @param type
+ * @return
+ */
+ public static int toJavaSQLType(org.apache.spark.sql.types.DataType type) {
+ if (type instanceof NullType) {
+ return Types.NULL;
+ } else if (type instanceof BooleanType) {
+ return Types.BOOLEAN;
+ } else if (type instanceof ByteType) {
+ return Types.TINYINT;
+ } else if (type instanceof ShortType) {
+ return Types.SMALLINT;
+ } else if (type instanceof IntegerType) {
+ return Types.INTEGER;
+ } else if (type instanceof LongType) {
+ return Types.BIGINT;
+ } else if (type instanceof FloatType) {
+ return Types.FLOAT;
+ } else if (type instanceof DoubleType) {
+ return Types.DOUBLE;
+ } else if (type instanceof StringType) {
+ return Types.VARCHAR;
+ } else if (type instanceof DecimalType) {
+ return Types.DECIMAL;
+ } else if (type instanceof DateType) {
+ return Types.DATE;
+ } else if (type instanceof TimestampType) {
+ return Types.TIMESTAMP;
+ } else if (type instanceof BinaryType) {
+ return Types.BINARY;
+ } else if (type instanceof ArrayType) {
+ return Types.ARRAY;
+ } else if (type instanceof MapType) {
+ return Types.JAVA_OBJECT;
+ } else if (type instanceof StructType) {
+ return Types.STRUCT;
+ } else {
+ return Types.OTHER;
+ }
+ }
+
+ /**
+ * This method is ported from Spark hive Thrift server TypeDescriptor
+ * @param type
+ * @return
+ */
+ public static Integer getColumnSize(org.apache.spark.sql.types.DataType type) {
+ if (type instanceof ByteType) {
+ return 3;
+ } else if (type instanceof ShortType) {
+ return 5;
+ } else if (type instanceof IntegerType) {
+ return 10;
+ } else if (type instanceof LongType) {
+ return 19;
+ } else if (type instanceof FloatType) {
+ return 7;
+ } else if (type instanceof DoubleType) {
+ return 15;
+ } else if (type instanceof DecimalType) {
+ return ((DecimalType)type).precision();
+ } else if (type instanceof StringType || type instanceof BinaryType || type instanceof MapType
+ || type instanceof ArrayType || type instanceof StructType) {
+ return Integer.MAX_VALUE;
+ } else if (type instanceof DateType) {
+ return 10;
+ } else if (type instanceof TimestampType) {
+ return 29;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * This method is ported from Spark hive Thrift server TypeDescriptor
+ * @param type
+ * @return
+ */
+ public static Integer getDecimalDigits(org.apache.spark.sql.types.DataType type) {
+ if (type instanceof BooleanType || type instanceof ByteType || type instanceof ShortType
+ || type instanceof IntegerType || type instanceof LongType) {
+ return 0;
+ } else if (type instanceof FloatType) {
+ return 7;
+ } else if (type instanceof DoubleType) {
+ return 15;
+ } else if (type instanceof DecimalType) {
+ return ((DecimalType)type).scale();
+ } else if (type instanceof TimestampType) {
+ return 9;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * This method is ported from Spark Hive Thrift server Type class
+ * @param type
+ * @return
+ */
+ public static Integer getNumPrecRadix(org.apache.spark.sql.types.DataType type) {
+ if (type instanceof ByteType || type instanceof ShortType || type instanceof IntegerType
+ || type instanceof LongType || type instanceof FloatType || type instanceof DoubleType
+ || type instanceof DecimalType) {
+ return 10;
+ } else {
+ return null;
+ }
+ }
}
diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
index 1d71259..5378270 100644
--- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
+++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
@@ -77,6 +77,7 @@ class ThriftSessionState {
this.sessionId = sessionId;
this.statements = new ConcurrentHashMap<>();
this.spark = ctx.<SparkSession>sparkSession().newSession();
+ this.catalogJobStates = new ConcurrentHashMap<>();
}
SparkSession spark() {
@@ -126,4 +127,35 @@ class ThriftSessionState {
}
}
+
+ private final ConcurrentMap<String, CatalogJobState> catalogJobStates;
+
+ void registerCatalogJob(String jobId, Iterator<Object[]> results) {
+ checkNotNull(jobId, "No catalog job ID.");
+ CatalogJobState state = new CatalogJobState(results);
+ if (catalogJobStates.putIfAbsent(jobId, state) != null) {
+ throw new IllegalStateException(
+ String.format("Catalog job %s already registered.", jobId));
+ }
+ }
+
+ CatalogJobState findCatalogJob(String jobId) {
+ checkNotNull(jobId, "No catalog job ID.");
+ CatalogJobState state = catalogJobStates.get(jobId);
+ if (state == null) {
+ throw catalogJobNotFound(jobId);
+ }
+ return state;
+ }
+
+ boolean cleanupCatalogJob(String jobId) {
+ checkNotNull(jobId, "No catalog job ID.");
+ return catalogJobStates.remove(jobId) != null;
+ }
+
+ private NoSuchElementException catalogJobNotFound(String jobId) {
+ return new NoSuchElementException(
+ String.format("Catalog job %s not found in session %s.", jobId, sessionId));
+ }
+
}
diff --git a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
index 4dd30a2..addc630 100644
--- a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
+++ b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
@@ -19,6 +19,8 @@ package org.apache.livy.thriftserver.session;
import java.net.URI;
import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -124,9 +126,58 @@ public class ThriftSessionTest {
assertTrue(cols[0].getNulls().get(0));
assertTrue(waitFor(new CleanupStatementJob(s1, st4)));
-
// Tear down the session.
waitFor(new UnregisterSessionJob(s1));
+
+ String s3 = nextSession();
+ waitFor(new RegisterSessionJob(s3));
+ String getSchemaJobId = "test_get_schema_job";
+ waitFor(new GetSchemasJob("default", s3, getSchemaJobId));
+ List<Object[]> schemas = waitFor(
+ new FetchCatalogResultJob(s3, getSchemaJobId, Integer.MAX_VALUE));
+ assertEquals(1, schemas.size());
+ assertEquals("default", schemas.get(0)[0]);
+ assertTrue(waitFor(new CleanupCatalogResultJob(s3, getSchemaJobId)));
+
+ String getTablesJobId = "test_get_tables_job";
+ List<String> testTableTypes = new ArrayList<>();
+ testTableTypes.add("Table");
+ waitFor(new GetTablesJob("default", "*",
+ testTableTypes, s3, getTablesJobId));
+ List<Object[]> tables = waitFor(
+ new FetchCatalogResultJob(s3, getTablesJobId, Integer.MAX_VALUE));
+ assertEquals(1, tables.size());
+ assertEquals("default", tables.get(0)[1]);
+ assertEquals("test", tables.get(0)[2]);
+ assertTrue(waitFor(new CleanupCatalogResultJob(s3, getTablesJobId)));
+
+ String getColumnsJobId = "test_get_columns_job";
+ waitFor(new GetColumnsJob("default", "test", ".*", s3, getColumnsJobId));
+ List<Object[]> columns = waitFor(
+ new FetchCatalogResultJob(s3, getColumnsJobId, Integer.MAX_VALUE));
+ assertEquals(2, columns.size());
+ assertEquals("default", columns.get(0)[1]);
+ assertEquals("test", columns.get(0)[2]);
+ assertEquals("id", columns.get(0)[3]);
+ assertEquals("integer", columns.get(0)[5]);
+ assertEquals("default", columns.get(1)[1]);
+ assertEquals("test", columns.get(1)[2]);
+ assertEquals("desc", columns.get(1)[3]);
+ assertEquals("string", columns.get(1)[5]);
+ assertTrue(waitFor(new CleanupCatalogResultJob(s3, getColumnsJobId)));
+
+ String getFunctionsJobId = "test_get_functions_job";
+ waitFor(new GetFunctionsJob("default", "unix_timestamp", s3, getFunctionsJobId));
+ List<Object[]> functions = waitFor(
+ new FetchCatalogResultJob(s3, getFunctionsJobId, Integer.MAX_VALUE));
+ assertEquals(1, functions.size());
+ assertNull(functions.get(0)[1]);
+ assertEquals("unix_timestamp", functions.get(0)[2]);
+ assertEquals("org.apache.spark.sql.catalyst.expressions.UnixTimestamp", functions.get(0)[5]);
+ assertTrue(waitFor(new CleanupCatalogResultJob(s3, getFunctionsJobId)));
+
+ // Tear down the session.
+ waitFor(new UnregisterSessionJob(s3));
}
private String nextSession() {