You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/06 19:53:40 UTC

spark git commit: [SPARK-22214][SQL] Refactor the list hive partitions code

Repository: spark
Updated Branches:
  refs/heads/master c7b46d4d8 -> 08b204fd2


[SPARK-22214][SQL] Refactor the list hive partitions code

## What changes were proposed in this pull request?

In this PR we make a few changes to the list hive partitions code, to make the code more extensible.
The following changes are made:
1. In `HiveClientImpl.getPartitions()`, call `client.getPartitions` instead of `shim.getAllPartitions` when `spec` is empty;
2. In `HiveTableScanExec`, previously we always call `listPartitionsByFilter` if the config `metastorePartitionPruning` is enabled, but actually, we'd better call `listPartitions` if `partitionPruningPred` is empty;
3.  We should use sessionCatalog instead of SharedState.externalCatalog in `HiveTableScanExec`.

## How was this patch tested?

Tested by existing test cases since this is code refactor, no regression or behavior change is expected.

Author: Xingbo Jiang <xi...@databricks.com>

Closes #19444 from jiangxb1987/hivePartitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08b204fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08b204fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08b204fd

Branch: refs/heads/master
Commit: 08b204fd2c731e87d3bc2cc0bccb6339ef7e3a6e
Parents: c7b46d4
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Fri Oct 6 12:53:35 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Oct 6 12:53:35 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  5 ++++
 .../spark/sql/hive/client/HiveClientImpl.scala  |  7 ++---
 .../sql/hive/execution/HiveTableScanExec.scala  | 28 +++++++++-----------
 3 files changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index fe2af91..975b084 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -405,6 +405,11 @@ object CatalogTypes {
    * Specifications of a table partition. Mapping column name to column value.
    */
   type TablePartitionSpec = Map[String, String]
+
+  /**
+   * Initialize an empty spec.
+   */
+  lazy val emptyTablePartitionSpec: TablePartitionSpec = Map.empty[String, String]
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 66165c7..a01c312 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -638,12 +638,13 @@ private[hive] class HiveClientImpl(
       table: CatalogTable,
       spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState {
     val hiveTable = toHiveTable(table, Some(userName))
-    val parts = spec match {
-      case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
+    val partSpec = spec match {
+      case None => CatalogTypes.emptyTablePartitionSpec
       case Some(s) =>
         assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
-        client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
+        s
     }
+    val parts = client.getPartitions(hiveTable, partSpec.asJava).asScala.map(fromHivePartition)
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/08b204fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 48d0b4a..4f8dab9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -162,21 +162,19 @@ case class HiveTableScanExec(
 
   // exposed for tests
   @transient lazy val rawPartitions = {
-    val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
-      // Retrieve the original attributes based on expression ID so that capitalization matches.
-      val normalizedFilters = partitionPruningPred.map(_.transform {
-        case a: AttributeReference => originalAttributes(a)
-      })
-      sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
-        relation.tableMeta.database,
-        relation.tableMeta.identifier.table,
-        normalizedFilters,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-    } else {
-      sparkSession.sharedState.externalCatalog.listPartitions(
-        relation.tableMeta.database,
-        relation.tableMeta.identifier.table)
-    }
+    val prunedPartitions =
+      if (sparkSession.sessionState.conf.metastorePartitionPruning &&
+          partitionPruningPred.size > 0) {
+        // Retrieve the original attributes based on expression ID so that capitalization matches.
+        val normalizedFilters = partitionPruningPred.map(_.transform {
+          case a: AttributeReference => originalAttributes(a)
+        })
+        sparkSession.sessionState.catalog.listPartitionsByFilter(
+          relation.tableMeta.identifier,
+          normalizedFilters)
+      } else {
+        sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
+      }
     prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org