You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/10/14 07:54:07 UTC
spark git commit: [SPARK-17903][SQL] MetastoreRelation should talk to
external catalog instead of hive client
Repository: spark
Updated Branches:
refs/heads/master 6c29b3de7 -> 2fb12b0a3
[SPARK-17903][SQL] MetastoreRelation should talk to external catalog instead of hive client
## What changes were proposed in this pull request?
`HiveExternalCatalog` should be the only interface to talk to the hive metastore. In `MetastoreRelation` we can just use `ExternalCatalog` instead of `HiveClient` to interact with hive metastore, and add missing API in `ExternalCatalog`.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <we...@databricks.com>
Closes #15460 from cloud-fan/relation.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fb12b0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fb12b0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fb12b0a
Branch: refs/heads/master
Commit: 2fb12b0a33deeeadfac451095f64dea6c967caac
Parents: 6c29b3d
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Oct 14 15:53:50 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Oct 14 15:53:50 2016 +0800
----------------------------------------------------------------------
.../sql/catalyst/catalog/ExternalCatalog.scala | 13 +++++++++++++
.../sql/catalyst/catalog/InMemoryCatalog.scala | 8 ++++++++
.../spark/sql/hive/HiveExternalCatalog.scala | 8 ++++++++
.../spark/sql/hive/HiveMetastoreCatalog.scala | 7 +++----
.../spark/sql/hive/MetastoreRelation.scala | 19 ++++++++++++-------
.../org/apache/spark/sql/hive/TableReader.scala | 3 +--
.../spark/sql/hive/client/HiveClient.scala | 15 +++------------
.../spark/sql/hive/client/HiveClientImpl.scala | 10 ++++++----
.../sql/hive/HiveExternalCatalogSuite.scala | 9 +++++++++
.../spark/sql/hive/MetastoreRelationSuite.scala | 2 +-
.../spark/sql/hive/client/VersionsSuite.scala | 4 ++--
11 files changed, 66 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index dd93b46..348d3d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
+import org.apache.spark.sql.catalyst.expressions.Expression
/**
@@ -196,6 +197,18 @@ abstract class ExternalCatalog {
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
+ /**
+ * List the metadata of selected partitions according to the given partition predicates.
+ *
+ * @param db database name
+ * @param table table name
+ * @param predicates partition predicated
+ */
+ def listPartitionsByFilter(
+ db: String,
+ table: String,
+ predicates: Seq[Expression]): Seq[CatalogTablePartition]
+
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 3e31127..49280f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.StringUtils
/**
@@ -477,6 +478,13 @@ class InMemoryCatalog(
catalog(db).tables(table).partitions.values.toSeq
}
+ override def listPartitionsByFilter(
+ db: String,
+ table: String,
+ predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+ throw new UnsupportedOperationException("listPartitionsByFilter is not implemented.")
+ }
+
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 237b829..b5d93c3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
@@ -646,6 +647,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.getPartitions(db, table, partialSpec)
}
+ override def listPartitionsByFilter(
+ db: String,
+ table: String,
+ predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+ client.getPartitionsByFilter(db, table, predicates)
+ }
+
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8410a2e..c44f0ad 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -44,8 +44,6 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
- private val client =
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
@@ -104,7 +102,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
- new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
+ val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
+ new Path(new Path(dbLocation), tblName).toString
}
def lookupRelation(
@@ -129,7 +128,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
} else {
val qualifiedTable =
MetastoreRelation(
- qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession)
+ qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 33f0ecf..da809cf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -43,7 +43,6 @@ private[hive] case class MetastoreRelation(
databaseName: String,
tableName: String)
(val catalogTable: CatalogTable,
- @transient private val client: HiveClient,
@transient private val sparkSession: SparkSession)
extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
@@ -59,7 +58,7 @@ private[hive] case class MetastoreRelation(
Objects.hashCode(databaseName, tableName, output)
}
- override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: client :: sparkSession :: Nil
+ override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull)
@@ -146,11 +145,18 @@ private[hive] case class MetastoreRelation(
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
- private lazy val allPartitions: Seq[CatalogTablePartition] = client.getPartitions(catalogTable)
+ private lazy val allPartitions: Seq[CatalogTablePartition] = {
+ sparkSession.sharedState.externalCatalog.listPartitions(
+ catalogTable.database,
+ catalogTable.identifier.table)
+ }
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
- client.getPartitionsByFilter(catalogTable, predicates)
+ sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
+ catalogTable.database,
+ catalogTable.identifier.table,
+ predicates)
} else {
allPartitions
}
@@ -234,8 +240,7 @@ private[hive] case class MetastoreRelation(
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
override def inputFiles: Array[String] = {
- val partLocations = client
- .getPartitionsByFilter(catalogTable, Nil)
+ val partLocations = allPartitions
.flatMap(_.storage.locationUri)
.toArray
if (partLocations.nonEmpty) {
@@ -248,6 +253,6 @@ private[hive] case class MetastoreRelation(
}
override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName)(catalogTable, client, sparkSession)
+ MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 2a54163..aaf30f4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -149,8 +149,7 @@ class HadoopTableReader(
* subdirectory of each partition being read. If None, then all files are accepted.
*/
def makeRDDForPartitionedTable(
- partitionToDeserializer: Map[HivePartition,
- Class[_ <: Deserializer]],
+ partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[InternalRow] = {
// SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 984d23b..9ee3d62 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -172,24 +172,15 @@ private[hive] trait HiveClient {
* Returns the partitions for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
- final def getPartitions(
+ def getPartitions(
db: String,
table: String,
- partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
- getPartitions(getTable(db, table), partialSpec)
- }
-
- /**
- * Returns the partitions for the given table that match the supplied partition spec.
- * If no partition spec is specified, all partitions are returned.
- */
- def getPartitions(
- table: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(
- table: CatalogTable,
+ db: String,
+ table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition]
/** Loads a static partition into an existing table. */
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index dd33d75..5c8f7ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -525,9 +525,10 @@ private[hive] class HiveClientImpl(
* If no partition spec is specified, all partitions are returned.
*/
override def getPartitions(
- table: CatalogTable,
+ db: String,
+ table: String,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(getTable(db, table))
spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
@@ -535,9 +536,10 @@ private[hive] class HiveClientImpl(
}
override def getPartitionsByFilter(
- table: CatalogTable,
+ db: String,
+ table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table)
+ val hiveTable = toHiveTable(getTable(db, table))
shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 26c2549..efa0beb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.dsl.expressions._
/**
* Test suite for the [[HiveExternalCatalog]].
@@ -43,4 +44,12 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
externalCatalog.client.reset()
}
+ import utils._
+
+ test("list partitions by filter") {
+ val catalog = newBasicCatalog()
+ val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1))
+ assert(selectedPartitions.length == 1)
+ assert(selectedPartitions.head.spec == part1.spec)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
index 2f3055d..c28e41a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
@@ -29,7 +29,7 @@ class MetastoreRelationSuite extends SparkFunSuite {
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = StructType(StructField("a", IntegerType, true) :: Nil))
- val relation = MetastoreRelation("db", "test")(table, null, null)
+ val relation = MetastoreRelation("db", "test")(table, null)
// No exception should be thrown
relation.makeCopy(Array("db", "test"))
http://git-wip-us.apache.org/repos/asf/spark/blob/2fb12b0a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9a10957..c158bf1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -295,12 +295,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: getPartitions(catalogTable)") {
- assert(2 == client.getPartitions(client.getTable("default", "src_part")).size)
+ assert(2 == client.getPartitions("default", "src_part").size)
}
test(s"$version: getPartitionsByFilter") {
// Only one partition [1, 1] for key2 == 1
- val result = client.getPartitionsByFilter(client.getTable("default", "src_part"),
+ val result = client.getPartitionsByFilter("default", "src_part",
Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
// Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org