You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/09/23 09:30:49 UTC
[spark] branch master updated: [SPARK-38717][SQL] Handle Hive's bucket spec case preserving behaviour
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0942ea9f352 [SPARK-38717][SQL] Handle Hive's bucket spec case preserving behaviour
0942ea9f352 is described below
commit 0942ea9f352f5fdf413dad750a672d15c7257776
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Fri Sep 23 17:30:19 2022 +0800
[SPARK-38717][SQL] Handle Hive's bucket spec case preserving behaviour
### What changes were proposed in this pull request?
When converting a native table metadata representation `CatalogTable` to `HiveTable` make sure bucket spec uses an existing column.
### Does this PR introduce _any_ user-facing change?
Hive metastore seems to be not case preserving with columns but case preserving with bucket spec, which means the following table creation:
```
CREATE TABLE t(
c STRING,
B_C STRING
)
PARTITIONED BY (p_c STRING)
CLUSTERED BY (B_C) INTO 4 BUCKETS
STORED AS PARQUET
```
followed by a query:
```
SELECT * FROM t
```
fails with:
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Bucket columns B_C is not part of the table columns ([FieldSchema(name:c, type:string, comment:null), FieldSchema(name:b_c, type:string, comment:null)]
```
### Why are the changes needed?
Bug fix.
### How was this patch tested?
Added new UT.
Closes #36027 from peter-toth/SPARK-38717-handle-upper-case-bucket-spec.
Lead-authored-by: Peter Toth <pe...@gmail.com>
Co-authored-by: Peter Toth <pt...@cloudera.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/hive/HiveExternalCatalog.scala | 6 ++--
.../apache/spark/sql/hive/client/HiveClient.scala | 20 +++++++++--
.../spark/sql/hive/client/HiveClientImpl.scala | 40 ++++++++++++++++------
.../spark/sql/hive/client/HiveClientSuite.scala | 4 +--
.../hive/client/HivePartitionFilteringSuite.scala | 8 ++---
.../spark/sql/hive/execution/SQLQuerySuite.scala | 17 +++++++++
6 files changed, 72 insertions(+), 23 deletions(-)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 00803e3fbe5..084c7b72104 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1296,11 +1296,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
predicates: Seq[Expression],
defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient {
- val rawTable = getRawTable(db, table)
- val catalogTable = restoreTableMetadata(rawTable)
+ val rawHiveTable = client.getRawHiveTable(db, table)
+ val catalogTable = restoreTableMetadata(rawHiveTable.toCatalogTable)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable)
val clientPrunedPartitions =
- client.getPartitionsByFilter(rawTable, predicates).map { part =>
+ client.getPartitionsByFilter(rawHiveTable, predicates).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
}
prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, defaultTimeZoneId)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 8b2bf310072..58cacfa1d5d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -26,6 +26,11 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
+private[hive] trait RawHiveTable {
+ def rawTable: Object
+ def toCatalogTable: CatalogTable
+}
+
/**
* An externally visible interface to the Hive client. This interface is shared across both the
* internal and external classloaders for a given version of Hive and thus must expose only
@@ -93,6 +98,15 @@ private[hive] trait HiveClient {
/** Returns the metadata for the specified table or None if it doesn't exist. */
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
+ /** Returns the specified catalog and Hive table, or throws `NoSuchTableException`. */
+ final def getRawHiveTable(dbName: String, tableName: String): RawHiveTable = {
+ getRawHiveTableOption(dbName, tableName)
+ .getOrElse(throw new NoSuchTableException(dbName, tableName))
+ }
+
+ /** Returns the metadata for the specified catalog and Hive table or None if it doesn't exist. */
+ def getRawHiveTableOption(dbName: String, tableName: String): Option[RawHiveTable]
+
/** Returns metadata of existing permanent tables/views for given names. */
def getTablesByName(dbName: String, tableNames: Seq[String]): Seq[CatalogTable]
@@ -203,12 +217,12 @@ private[hive] trait HiveClient {
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = {
- getPartitionOption(getTable(db, table), spec)
+ getPartitionOption(getRawHiveTable(db, table), spec)
}
/** Returns the specified partition or None if it does not exist. */
def getPartitionOption(
- table: CatalogTable,
+ table: RawHiveTable,
spec: TablePartitionSpec): Option[CatalogTablePartition]
/**
@@ -222,7 +236,7 @@ private[hive] trait HiveClient {
/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(
- catalogTable: CatalogTable,
+ catalogTable: RawHiveTable,
predicates: Seq[Expression]): Seq[CatalogTablePartition]
/** Loads a static partition into an existing table. */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 94663e5c2ec..61951cde8d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -102,6 +102,10 @@ private[hive] class HiveClientImpl(
extends HiveClient
with Logging {
+ private class RawHiveTableImpl(override val rawTable: HiveTable) extends RawHiveTable {
+ override lazy val toCatalogTable = convertHiveTableToCatalogTable(rawTable)
+ }
+
import HiveClientImpl._
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
@@ -435,6 +439,13 @@ private[hive] class HiveClientImpl(
getRawTableOption(dbName, tableName).map(convertHiveTableToCatalogTable)
}
+ override def getRawHiveTableOption(
+ dbName: String,
+ tableName: String): Option[RawHiveTable] = withHiveState {
+ logDebug(s"Looking up $dbName.$tableName")
+ getRawTableOption(dbName, tableName).map(new RawHiveTableImpl(_))
+ }
+
private def convertHiveTableToCatalogTable(h: HiveTable): CatalogTable = {
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
@@ -687,13 +698,14 @@ private[hive] class HiveClientImpl(
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
- val catalogTable = getTable(db, table)
- val hiveTable = toHiveTable(catalogTable, Some(userName))
+ val rawHiveTable = getRawHiveTable(db, table)
+ val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
+ hiveTable.setOwner(userName)
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
if (shim.getPartition(client, hiveTable, newSpec.asJava, false) != null) {
throw new PartitionAlreadyExistsException(db, table, newSpec)
}
- val hivePart = getPartitionOption(catalogTable, oldSpec)
+ val hivePart = getPartitionOption(rawHiveTable, oldSpec)
.map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
.getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) }
shim.renamePartition(client, hiveTable, oldSpec.asJava, hivePart)
@@ -711,7 +723,10 @@ private[hive] class HiveClientImpl(
val original = state.getCurrentDatabase
try {
setCurrentDatabaseRaw(db)
- val hiveTable = toHiveTable(getTable(db, table), Some(userName))
+ val hiveTable = withHiveState {
+ getRawTableOption(db, table).getOrElse(throw new NoSuchTableException(db, table))
+ }
+ hiveTable.setOwner(userName)
shim.alterPartitions(client, table, newParts.map { toHivePartition(_, hiveTable) }.asJava)
} finally {
state.setCurrentDatabase(original)
@@ -740,11 +755,12 @@ private[hive] class HiveClientImpl(
}
override def getPartitionOption(
- table: CatalogTable,
+ rawHiveTable: RawHiveTable,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table, Some(userName))
+ val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false)
- Option(hivePartition).map(fromHivePartition(_, table.storage.locationUri))
+ Option(hivePartition)
+ .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri))
}
override def getPartitions(
@@ -775,11 +791,13 @@ private[hive] class HiveClientImpl(
}
override def getPartitionsByFilter(
- table: CatalogTable,
+ rawHiveTable: RawHiveTable,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
- val hiveTable = toHiveTable(table, Some(userName))
- val parts = shim.getPartitionsByFilter(client, hiveTable, predicates, table)
- .map(fromHivePartition(_, table.storage.locationUri))
+ val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
+ hiveTable.setOwner(userName)
+ val parts =
+ shim.getPartitionsByFilter(client, hiveTable, predicates, rawHiveTable.toCatalogTable)
+ .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri))
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 22698b91cb6..2645d411b01 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -413,7 +413,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String])
test("getPartitionsByFilter") {
// Only one partition [1, 1] for key2 == 1
- val result = client.getPartitionsByFilter(client.getTable("default", "src_part"),
+ val result = client.getPartitionsByFilter(client.getRawHiveTable("default", "src_part"),
Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
// Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
@@ -437,7 +437,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String])
test("getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") {
val partition = client.getPartitionOption(
- client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2"))
+ client.getRawHiveTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2"))
assert(partition.isDefined)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
index e9ab8edf9ad..efbf0b0b8be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
@@ -121,7 +121,7 @@ class HivePartitionFilteringSuite(version: String)
test(s"getPartitionsByFilter returns all partitions when $fallbackKey=true") {
withSQLConf(fallbackKey -> "true") {
val filteredPartitions = clientWithoutDirectSql.getPartitionsByFilter(
- clientWithoutDirectSql.getTable("default", "test"),
+ clientWithoutDirectSql.getRawHiveTable("default", "test"),
Seq(attr("ds") === 20170101))
assert(filteredPartitions.size == testPartitionCount)
@@ -132,7 +132,7 @@ class HivePartitionFilteringSuite(version: String)
withSQLConf(fallbackKey -> "false") {
val e = intercept[RuntimeException](
clientWithoutDirectSql.getPartitionsByFilter(
- clientWithoutDirectSql.getTable("default", "test"),
+ clientWithoutDirectSql.getRawHiveTable("default", "test"),
Seq(attr("ds") === 20170101)))
assert(e.getMessage.contains("Caught Hive MetaException"))
}
@@ -628,7 +628,7 @@ class HivePartitionFilteringSuite(version: String)
test(s"SPARK-35437: getPartitionsByFilter: ds=20170101 when $fallbackKey=true") {
withSQLConf(fallbackKey -> "true", pruningFastFallback -> "true") {
val filteredPartitions = clientWithoutDirectSql.getPartitionsByFilter(
- clientWithoutDirectSql.getTable("default", "test"),
+ clientWithoutDirectSql.getRawHiveTable("default", "test"),
Seq(attr("ds") === 20170101))
assert(filteredPartitions.size == 1 * hValue.size * chunkValue.size *
@@ -705,7 +705,7 @@ class HivePartitionFilteringSuite(version: String)
filterExpr: Expression,
expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String], Seq[String], Seq[String])],
transform: Expression => Expression): Unit = {
- val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
+ val filteredPartitions = client.getPartitionsByFilter(client.getRawHiveTable("default", "test"),
Seq(
transform(filterExpr)
))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index faefbfd1d6d..62a48a660c8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2646,6 +2646,23 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
}
}
}
+
+ test("SPARK-38717: Handle Hive's bucket spec case preserving behaviour") {
+ withTable("t") {
+ sql(
+ s"""
+ |CREATE TABLE t(
+ | c STRING,
+ | B_C STRING
+ |)
+ |PARTITIONED BY (p_c STRING)
+ |CLUSTERED BY (B_C) INTO 4 BUCKETS
+ |STORED AS PARQUET
+ |""".stripMargin)
+ val df = sql("SELECT * FROM t")
+ checkAnswer(df, Seq.empty[Row])
+ }
+ }
}
@SlowHiveTest
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org