You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/06 19:53:40 UTC
spark git commit: [SPARK-22214][SQL] Refactor the list hive
partitions code
Repository: spark
Updated Branches:
refs/heads/master c7b46d4d8 -> 08b204fd2
[SPARK-22214][SQL] Refactor the list hive partitions code
## What changes were proposed in this pull request?
In this PR we make a few changes to the list hive partitions code, to make the code more extensible.
The following changes are made:
1. In `HiveClientImpl.getPartitions()`, call `client.getPartitions` instead of `shim.getAllPartitions` when `spec` is empty;
2. In `HiveTableScanExec`, previously we always call `listPartitionsByFilter` if the config `metastorePartitionPruning` is enabled, but actually, we'd better call `listPartitions` if `partitionPruningPred` is empty;
3. We should use sessionCatalog instead of SharedState.externalCatalog in `HiveTableScanExec`.
## How was this patch tested?
Tested by existing test cases since this is code refactor, no regression or behavior change is expected.
Author: Xingbo Jiang <xi...@databricks.com>
Closes #19444 from jiangxb1987/hivePartitions.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08b204fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08b204fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08b204fd
Branch: refs/heads/master
Commit: 08b204fd2c731e87d3bc2cc0bccb6339ef7e3a6e
Parents: c7b46d4
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Fri Oct 6 12:53:35 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Oct 6 12:53:35 2017 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/catalog/interface.scala | 5 ++++
.../spark/sql/hive/client/HiveClientImpl.scala | 7 ++---
.../sql/hive/execution/HiveTableScanExec.scala | 28 +++++++++-----------
3 files changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index fe2af91..975b084 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -405,6 +405,11 @@ object CatalogTypes {
* Specifications of a table partition. Mapping column name to column value.
*/
type TablePartitionSpec = Map[String, String]
+
+ /**
+ * Initialize an empty spec.
+ */
+ lazy val emptyTablePartitionSpec: TablePartitionSpec = Map.empty[String, String]
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 66165c7..a01c312 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -638,12 +638,13 @@ private[hive] class HiveClientImpl(
table: CatalogTable,
spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
val hiveTable = toHiveTable(table, Some(userName))
- val parts = spec match {
- case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
+ val partSpec = spec match {
+ case None => CatalogTypes.emptyTablePartitionSpec
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
- client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
+ s
}
+ val parts = client.getPartitions(hiveTable, partSpec.asJava).asScala.map(fromHivePartition)
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 48d0b4a..4f8dab9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -162,21 +162,19 @@ case class HiveTableScanExec(
// exposed for tests
@transient lazy val rawPartitions = {
- val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
- // Retrieve the original attributes based on expression ID so that capitalization matches.
- val normalizedFilters = partitionPruningPred.map(_.transform {
- case a: AttributeReference => originalAttributes(a)
- })
- sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
- relation.tableMeta.database,
- relation.tableMeta.identifier.table,
- normalizedFilters,
- sparkSession.sessionState.conf.sessionLocalTimeZone)
- } else {
- sparkSession.sharedState.externalCatalog.listPartitions(
- relation.tableMeta.database,
- relation.tableMeta.identifier.table)
- }
+ val prunedPartitions =
+ if (sparkSession.sessionState.conf.metastorePartitionPruning &&
+ partitionPruningPred.size > 0) {
+ // Retrieve the original attributes based on expression ID so that capitalization matches.
+ val normalizedFilters = partitionPruningPred.map(_.transform {
+ case a: AttributeReference => originalAttributes(a)
+ })
+ sparkSession.sessionState.catalog.listPartitionsByFilter(
+ relation.tableMeta.identifier,
+ normalizedFilters)
+ } else {
+ sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+ }
prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org