You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/09/23 09:30:49 UTC

[spark] branch master updated: [SPARK-38717][SQL] Handle Hive's bucket spec case preserving behaviour

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0942ea9f352 [SPARK-38717][SQL] Handle Hive's bucket spec case preserving behaviour
0942ea9f352 is described below

commit 0942ea9f352f5fdf413dad750a672d15c7257776
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Fri Sep 23 17:30:19 2022 +0800

    [SPARK-38717][SQL] Handle Hive's bucket spec case preserving behaviour
    
    ### What changes were proposed in this pull request?
    When converting a native table metadata representation `CatalogTable` to `HiveTable` make sure bucket spec uses an existing column.
    
    ### Does this PR introduce _any_ user-facing change?
    Hive metastore seems to be not case preserving with columns but case preserving with bucket spec, which means the following table creation:
    ```
      CREATE TABLE t(
        c STRING,
        B_C STRING
      )
      PARTITIONED BY (p_c STRING)
      CLUSTERED BY (B_C) INTO 4 BUCKETS
      STORED AS PARQUET
    ```
    followed by a query:
    ```
      SELECT * FROM t
    ```
    fails with:
    ```
    Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns B_C is not part of the table columns ([FieldSchema(name:c, type:string, comment:null), FieldSchema(name:b_c, type:string, comment:null)]
    ```
    
    ### Why are the changes needed?
    Bug fix.
    
    ### How was this patch tested?
    Added new UT.
    
    Closes #36027 from peter-toth/SPARK-38717-handle-upper-case-bucket-spec.
    
    Lead-authored-by: Peter Toth <pe...@gmail.com>
    Co-authored-by: Peter Toth <pt...@cloudera.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/hive/HiveExternalCatalog.scala       |  6 ++--
 .../apache/spark/sql/hive/client/HiveClient.scala  | 20 +++++++++--
 .../spark/sql/hive/client/HiveClientImpl.scala     | 40 ++++++++++++++++------
 .../spark/sql/hive/client/HiveClientSuite.scala    |  4 +--
 .../hive/client/HivePartitionFilteringSuite.scala  |  8 ++---
 .../spark/sql/hive/execution/SQLQuerySuite.scala   | 17 +++++++++
 6 files changed, 72 insertions(+), 23 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 00803e3fbe5..084c7b72104 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1296,11 +1296,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       predicates: Seq[Expression],
       defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient {
-    val rawTable = getRawTable(db, table)
-    val catalogTable = restoreTableMetadata(rawTable)
+    val rawHiveTable = client.getRawHiveTable(db, table)
+    val catalogTable = restoreTableMetadata(rawHiveTable.toCatalogTable)
     val partColNameMap = buildLowerCasePartColNameMap(catalogTable)
     val clientPrunedPartitions =
-      client.getPartitionsByFilter(rawTable, predicates).map { part =>
+      client.getPartitionsByFilter(rawHiveTable, predicates).map { part =>
         part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
       }
     prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, defaultTimeZoneId)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 8b2bf310072..58cacfa1d5d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -26,6 +26,11 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.types.StructType
 
 
+private[hive] trait RawHiveTable {
+  def rawTable: Object
+  def toCatalogTable: CatalogTable
+}
+
 /**
  * An externally visible interface to the Hive client.  This interface is shared across both the
  * internal and external classloaders for a given version of Hive and thus must expose only
@@ -93,6 +98,15 @@ private[hive] trait HiveClient {
   /** Returns the metadata for the specified table or None if it doesn't exist. */
   def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
 
+  /** Returns the specified catalog and Hive table, or throws `NoSuchTableException`. */
+  final def getRawHiveTable(dbName: String, tableName: String): RawHiveTable = {
+    getRawHiveTableOption(dbName, tableName)
+      .getOrElse(throw new NoSuchTableException(dbName, tableName))
+  }
+
+  /** Returns the metadata for the specified catalog and Hive table or None if it doesn't exist. */
+  def getRawHiveTableOption(dbName: String, tableName: String): Option[RawHiveTable]
+
   /** Returns metadata of existing permanent tables/views for given names. */
   def getTablesByName(dbName: String, tableNames: Seq[String]): Seq[CatalogTable]
 
@@ -203,12 +217,12 @@ private[hive] trait HiveClient {
       db: String,
       table: String,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = {
-    getPartitionOption(getTable(db, table), spec)
+    getPartitionOption(getRawHiveTable(db, table), spec)
   }
 
   /** Returns the specified partition or None if it does not exist. */
   def getPartitionOption(
-      table: CatalogTable,
+      table: RawHiveTable,
       spec: TablePartitionSpec): Option[CatalogTablePartition]
 
   /**
@@ -222,7 +236,7 @@ private[hive] trait HiveClient {
 
   /** Returns partitions filtered by predicates for the given table. */
   def getPartitionsByFilter(
-      catalogTable: CatalogTable,
+      catalogTable: RawHiveTable,
       predicates: Seq[Expression]): Seq[CatalogTablePartition]
 
   /** Loads a static partition into an existing table. */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 94663e5c2ec..61951cde8d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -102,6 +102,10 @@ private[hive] class HiveClientImpl(
   extends HiveClient
   with Logging {
 
+  private class RawHiveTableImpl(override val rawTable: HiveTable) extends RawHiveTable {
+    override lazy val toCatalogTable = convertHiveTableToCatalogTable(rawTable)
+  }
+
   import HiveClientImpl._
 
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
@@ -435,6 +439,13 @@ private[hive] class HiveClientImpl(
     getRawTableOption(dbName, tableName).map(convertHiveTableToCatalogTable)
   }
 
+  override def getRawHiveTableOption(
+      dbName: String,
+      tableName: String): Option[RawHiveTable] = withHiveState {
+    logDebug(s"Looking up $dbName.$tableName")
+    getRawTableOption(dbName, tableName).map(new RawHiveTableImpl(_))
+  }
+
   private def convertHiveTableToCatalogTable(h: HiveTable): CatalogTable = {
     // Note: Hive separates partition columns and the schema, but for us the
     // partition columns are part of the schema
@@ -687,13 +698,14 @@ private[hive] class HiveClientImpl(
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState {
     require(specs.size == newSpecs.size, "number of old and new partition specs differ")
-    val catalogTable = getTable(db, table)
-    val hiveTable = toHiveTable(catalogTable, Some(userName))
+    val rawHiveTable = getRawHiveTable(db, table)
+    val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
+    hiveTable.setOwner(userName)
     specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
       if (shim.getPartition(client, hiveTable, newSpec.asJava, false) != null) {
         throw new PartitionAlreadyExistsException(db, table, newSpec)
       }
-      val hivePart = getPartitionOption(catalogTable, oldSpec)
+      val hivePart = getPartitionOption(rawHiveTable, oldSpec)
         .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
         .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) }
       shim.renamePartition(client, hiveTable, oldSpec.asJava, hivePart)
@@ -711,7 +723,10 @@ private[hive] class HiveClientImpl(
     val original = state.getCurrentDatabase
     try {
       setCurrentDatabaseRaw(db)
-      val hiveTable = toHiveTable(getTable(db, table), Some(userName))
+      val hiveTable = withHiveState {
+        getRawTableOption(db, table).getOrElse(throw new NoSuchTableException(db, table))
+      }
+      hiveTable.setOwner(userName)
       shim.alterPartitions(client, table, newParts.map { toHivePartition(_, hiveTable) }.asJava)
     } finally {
       state.setCurrentDatabase(original)
@@ -740,11 +755,12 @@ private[hive] class HiveClientImpl(
   }
 
   override def getPartitionOption(
-      table: CatalogTable,
+      rawHiveTable: RawHiveTable,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
-    val hiveTable = toHiveTable(table, Some(userName))
+    val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
     val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false)
-    Option(hivePartition).map(fromHivePartition(_, table.storage.locationUri))
+    Option(hivePartition)
+      .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri))
   }
 
   override def getPartitions(
@@ -775,11 +791,13 @@ private[hive] class HiveClientImpl(
   }
 
   override def getPartitionsByFilter(
-      table: CatalogTable,
+      rawHiveTable: RawHiveTable,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
-    val hiveTable = toHiveTable(table, Some(userName))
-    val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, table)
-      .map(fromHivePartition(_, table.storage.locationUri))
+    val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
+    hiveTable.setOwner(userName)
+    val parts =
+      shim.getPartitionsByFilter(client, hiveTable, predicates, rawHiveTable.toCatalogTable)
+        .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri))
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 22698b91cb6..2645d411b01 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -413,7 +413,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String])
 
   test("getPartitionsByFilter") {
     // Only one partition [1, 1] for key2 == 1
-    val result = client.getPartitionsByFilter(client.getTable("default", "src_part"),
+    val result = client.getPartitionsByFilter(client.getRawHiveTable("default", "src_part"),
       Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
 
     // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
@@ -437,7 +437,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String])
 
   test("getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") {
     val partition = client.getPartitionOption(
-      client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2"))
+      client.getRawHiveTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2"))
     assert(partition.isDefined)
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
index e9ab8edf9ad..efbf0b0b8be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
@@ -121,7 +121,7 @@ class HivePartitionFilteringSuite(version: String)
   test(s"getPartitionsByFilter returns all partitions when $fallbackKey=true") {
     withSQLConf(fallbackKey -> "true") {
       val filteredPartitions = clientWithoutDirectSql.getPartitionsByFilter(
-        clientWithoutDirectSql.getTable("default", "test"),
+        clientWithoutDirectSql.getRawHiveTable("default", "test"),
         Seq(attr("ds") === 20170101))
 
       assert(filteredPartitions.size == testPartitionCount)
@@ -132,7 +132,7 @@ class HivePartitionFilteringSuite(version: String)
     withSQLConf(fallbackKey -> "false") {
       val e = intercept[RuntimeException](
         clientWithoutDirectSql.getPartitionsByFilter(
-          clientWithoutDirectSql.getTable("default", "test"),
+          clientWithoutDirectSql.getRawHiveTable("default", "test"),
           Seq(attr("ds") === 20170101)))
       assert(e.getMessage.contains("Caught Hive MetaException"))
     }
@@ -628,7 +628,7 @@ class HivePartitionFilteringSuite(version: String)
   test(s"SPARK-35437: getPartitionsByFilter: ds=20170101 when $fallbackKey=true") {
     withSQLConf(fallbackKey -> "true", pruningFastFallback -> "true") {
       val filteredPartitions = clientWithoutDirectSql.getPartitionsByFilter(
-        clientWithoutDirectSql.getTable("default", "test"),
+        clientWithoutDirectSql.getRawHiveTable("default", "test"),
         Seq(attr("ds") === 20170101))
 
       assert(filteredPartitions.size == 1 * hValue.size * chunkValue.size *
@@ -705,7 +705,7 @@ class HivePartitionFilteringSuite(version: String)
       filterExpr: Expression,
       expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String], Seq[String], Seq[String])],
       transform: Expression => Expression): Unit = {
-    val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
+    val filteredPartitions = client.getPartitionsByFilter(client.getRawHiveTable("default", "test"),
       Seq(
         transform(filterExpr)
       ))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index faefbfd1d6d..62a48a660c8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2646,6 +2646,23 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
       }
     }
   }
+
+  test("SPARK-38717: Handle Hive's bucket spec case preserving behaviour") {
+    withTable("t") {
+      sql(
+        s"""
+           |CREATE TABLE t(
+           |  c STRING,
+           |  B_C STRING
+           |)
+           |PARTITIONED BY (p_c STRING)
+           |CLUSTERED BY (B_C) INTO 4 BUCKETS
+           |STORED AS PARQUET
+           |""".stripMargin)
+      val df = sql("SELECT * FROM t")
+      checkAnswer(df, Seq.empty[Row])
+    }
+  }
 }
 
 @SlowHiveTest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org