You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/02/18 07:35:58 UTC
[spark] branch master updated: [SPARK-24570][SQL] Implement Spark
own GetTablesOperation to fix SQL client tools cannot show tables
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7f53116 [SPARK-24570][SQL] Implement Spark own GetTablesOperation to fix SQL client tools cannot show tables
7f53116 is described below
commit 7f53116f77bac6302bb727769b4a4c684b6b0b5b
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Sun Feb 17 23:35:45 2019 -0800
[SPARK-24570][SQL] Implement Spark own GetTablesOperation to fix SQL client tools cannot show tables
## What changes were proposed in this pull request?
For SQL client tools([DBeaver](https://dbeaver.io/))'s Navigator use [`GetTablesOperation`](https://github.com/apache/spark/blob/a7444570764b0a08b7e908dc7931744f9dbdf3c6/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java) to obtain table names.
We should use [`metadataHive`](https://github.com/apache/spark/blob/95d172da2b370ff6257bfd6fcd102ac553f6f6af/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala#L52-L53), but it use [`executionHive`](https://github.com/apache/spark/blob/24f5bbd770033dacdea62555488bfffb61665279/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L93-L95).
This PR implement Spark own `GetTablesOperation` to use `metadataHive`.
## How was this patch tested?
unit test and manual tests
![image](https://user-images.githubusercontent.com/5399861/47430696-acf77980-d7cc-11e8-824d-f28d78f60a00.png)
![image](https://user-images.githubusercontent.com/5399861/47440576-09649400-d7e1-11e8-97a8-a96f73f70361.png)
Closes #22794 from wangyum/SPARK-24570.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../service/cli/operation/GetTablesOperation.java | 2 +-
.../thriftserver/SparkGetTablesOperation.scala | 99 ++++++++++++++++++++++
.../server/SparkSQLOperationManager.scala | 22 ++++-
.../thriftserver/HiveThriftServer2Suites.scala | 2 +-
.../thriftserver/SparkMetadataOperationSuite.scala | 87 ++++++++++++++++++-
5 files changed, 206 insertions(+), 6 deletions(-)
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
index 1a7ca79..2af17a6 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java
@@ -46,7 +46,7 @@ public class GetTablesOperation extends MetadataOperation {
private final String schemaName;
private final String tableName;
private final List<String> tableTypes = new ArrayList<String>();
- private final RowSet rowSet;
+ protected final RowSet rowSet;
private final TableTypeMapping tableTypeMapping;
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
new file mode 100644
index 0000000..3696500
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConverters.seqAsJavaListConverter
+
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.GetTablesOperation
+import org.apache.hive.service.cli.session.HiveSession
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
+
+/**
+ * Spark's own GetTablesOperation
+ *
+ * @param sqlContext SQLContext to use
+ * @param parentSession a HiveSession from SessionManager
+ * @param catalogName catalog name. null if not applicable
+ * @param schemaName database name, null or a concrete database name
+ * @param tableName table name pattern
+ * @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW"
+ */
+private[hive] class SparkGetTablesOperation(
+ sqlContext: SQLContext,
+ parentSession: HiveSession,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: JList[String])
+ extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) {
+
+ if (tableTypes != null) {
+ this.tableTypes.addAll(tableTypes)
+ }
+
+ override def runInternal(): Unit = {
+ setState(OperationState.RUNNING)
+ // Always use the latest class loader provided by executionHive's state.
+ val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
+ Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
+
+ val catalog = sqlContext.sessionState.catalog
+ val schemaPattern = convertSchemaPattern(schemaName)
+ val matchingDbs = catalog.listDatabases(schemaPattern)
+
+ if (isAuthV2Enabled) {
+ val privObjs =
+ HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava)
+ val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
+ authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr)
+ }
+
+ val tablePattern = convertIdentifierPattern(tableName, true)
+ matchingDbs.foreach { dbName =>
+ catalog.listTables(dbName, tablePattern).foreach { tableIdentifier =>
+ val catalogTable = catalog.getTableMetadata(tableIdentifier)
+ val tableType = tableTypeString(catalogTable.tableType)
+ if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) {
+ val rowData = Array[AnyRef](
+ "",
+ catalogTable.database,
+ catalogTable.identifier.table,
+ tableType,
+ catalogTable.comment.getOrElse(""))
+ rowSet.addRow(rowData)
+ }
+ }
+ }
+ setState(OperationState.FINISHED)
+ }
+
+ private def tableTypeString(tableType: CatalogTableType): String = tableType match {
+ case EXTERNAL | MANAGED => "TABLE"
+ case VIEW => "VIEW"
+ case t =>
+ throw new IllegalArgumentException(s"Unknown table type is found at showCreateHiveTable: $t")
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 85b6c71..7947d17 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -17,17 +17,17 @@
package org.apache.spark.sql.hive.thriftserver.server
-import java.util.{Map => JMap}
+import java.util.{List => JList, Map => JMap}
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.cli._
-import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetSchemasOperation, Operation, OperationManager}
+import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetSchemasOperation, MetadataOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetSchemasOperation}
+import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetSchemasOperation, SparkGetTablesOperation}
import org.apache.spark.sql.internal.SQLConf
/**
@@ -76,6 +76,22 @@ private[thriftserver] class SparkSQLOperationManager()
operation
}
+ override def newGetTablesOperation(
+ parentSession: HiveSession,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: JList[String]): MetadataOperation = synchronized {
+ val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
+ require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
+ " initialized or had already closed.")
+ val operation = new SparkGetTablesOperation(sqlContext, parentSession,
+ catalogName, schemaName, tableName, tableTypes)
+ handleToOperation.put(operation.getHandle, operation)
+ logDebug(s"Created GetTablesOperation with session=$parentSession.")
+ operation
+ }
+
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
val iterator = confMap.entrySet().iterator()
while (iterator.hasNext) {
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index f9509ae..0f53fcd 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -280,7 +280,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
var defaultV2: String = null
var data: ArrayBuffer[Int] = null
- withMultipleConnectionJdbcStatement("test_map")(
+ withMultipleConnectionJdbcStatement("test_map", "db1.test_map2")(
// create table
{ statement =>
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
index 9a997ae..bf99823 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.thriftserver
-import java.util.Properties
+import java.util.{Arrays => JArrays, List => JList, Properties}
import org.apache.hive.jdbc.{HiveConnection, HiveQueryResultSet, Utils => JdbcUtils}
import org.apache.hive.service.auth.PlainSaslHelper
@@ -100,4 +100,89 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
}
}
}
+
+ test("Spark's own GetTablesOperation(SparkGetTablesOperation)") {
+ def testGetTablesOperation(
+ schema: String,
+ tableNamePattern: String,
+ tableTypes: JList[String])(f: HiveQueryResultSet => Unit): Unit = {
+ val rawTransport = new TSocket("localhost", serverPort)
+ val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties)
+ val user = System.getProperty("user.name")
+ val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
+ val client = new TCLIService.Client(new TBinaryProtocol(transport))
+ transport.open()
+
+ var rs: HiveQueryResultSet = null
+
+ try {
+ val openResp = client.OpenSession(new TOpenSessionReq)
+ val sessHandle = openResp.getSessionHandle
+
+ val getTableReq = new TGetTablesReq(sessHandle)
+ getTableReq.setSchemaName(schema)
+ getTableReq.setTableName(tableNamePattern)
+ getTableReq.setTableTypes(tableTypes)
+
+ val getTableResp = client.GetTables(getTableReq)
+
+ JdbcUtils.verifySuccess(getTableResp.getStatus)
+
+ rs = new HiveQueryResultSet.Builder(connection)
+ .setClient(client)
+ .setSessionHandle(sessHandle)
+ .setStmtHandle(getTableResp.getOperationHandle)
+ .build()
+
+ f(rs)
+ } finally {
+ rs.close()
+ connection.close()
+ transport.close()
+ rawTransport.close()
+ }
+ }
+
+ def checkResult(tableNames: Seq[String], rs: HiveQueryResultSet): Unit = {
+ if (tableNames.nonEmpty) {
+ for (i <- tableNames.indices) {
+ assert(rs.next())
+ assert(rs.getString("TABLE_NAME") === tableNames(i))
+ }
+ } else {
+ assert(!rs.next())
+ }
+ }
+
+ withJdbcStatement("table1", "table2") { statement =>
+ Seq(
+ "CREATE TABLE table1(key INT, val STRING)",
+ "CREATE TABLE table2(key INT, val STRING)",
+ "CREATE VIEW view1 AS SELECT * FROM table2").foreach(statement.execute)
+
+ testGetTablesOperation("%", "%", null) { rs =>
+ checkResult(Seq("table1", "table2", "view1"), rs)
+ }
+
+ testGetTablesOperation("%", "table1", null) { rs =>
+ checkResult(Seq("table1"), rs)
+ }
+
+ testGetTablesOperation("%", "table_not_exist", null) { rs =>
+ checkResult(Seq.empty, rs)
+ }
+
+ testGetTablesOperation("%", "%", JArrays.asList("TABLE")) { rs =>
+ checkResult(Seq("table1", "table2"), rs)
+ }
+
+ testGetTablesOperation("%", "%", JArrays.asList("VIEW")) { rs =>
+ checkResult(Seq("view1"), rs)
+ }
+
+ testGetTablesOperation("%", "%", JArrays.asList("TABLE", "VIEW")) { rs =>
+ checkResult(Seq("table1", "table2", "view1"), rs)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org