You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/03 05:41:32 UTC
[kyuubi] branch master updated: [KYUUBI #4237] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new dcf863ad7 [KYUUBI #4237] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog
dcf863ad7 is described below
commit dcf863ad7c8ff5c179a225c136d3d4be46798eed
Author: jiaoqingbo <11...@qq.com>
AuthorDate: Fri Feb 3 05:41:22 2023 +0000
[KYUUBI #4237] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog
### _Why are the changes needed?_
fix #4222
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4237 from jiaoqingbo/kyuubi4222.
Closes #4237
7677d69f [jiaoqingbo] code review
538d436a [jiaoqingbo] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog
Authored-by: jiaoqingbo <11...@qq.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../connector/hive/write/HiveBatchWrite.scala | 22 ++++++++++++++++++----
.../spark/connector/hive/write/HiveWrite.scala | 2 ++
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
index ba89ec715..c4e473ff2 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
@@ -33,11 +33,13 @@ import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog}
import org.apache.spark.sql.types.StringType
-import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
+import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, KyuubiHiveConnectorException}
import org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec
class HiveBatchWrite(
+ sparkSession: SparkSession,
table: CatalogTable,
+ hiveTableCatalog: HiveTableCatalog,
tmpLocation: Option[Path],
partition: Map[String, Option[String]],
partitionColumnNames: Seq[String],
@@ -66,10 +68,22 @@ class HiveBatchWrite(
deleteExternalTmpPath(hadoopConf)
}
- val sparkSession = SparkSession.active
// un-cache this table.
- CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString)
- CommandUtils.updateTableStats(sparkSession, table)
+ hiveTableCatalog.catalog.invalidateCachedTable(table.identifier)
+
+ val catalog = hiveTableCatalog.catalog
+ if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+ val newTable = catalog.getTableMetadata(table.identifier)
+ val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
+ val newStats = CatalogStatistics(sizeInBytes = newSize)
+ catalog.alterTableStats(table.identifier, Some(newStats))
+ } else if (table.stats.nonEmpty) {
+ catalog.alterTableStats(table.identifier, None)
+ } else {
+ // In other cases, we still need to invalidate the table relation cache.
+ catalog.refreshTable(table.identifier)
+ }
+
}
override def abort(messages: Array[WriterCommitMessage]): Unit = {
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 50ab2b5ed..486a7aa22 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -102,7 +102,9 @@ case class HiveWrite(
committer.setupJob(job)
new HiveBatchWrite(
+ sparkSession,
table,
+ hiveTableCatalog,
Some(tmpLocation),
partition,
partitionColumnNames,