You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/03 05:41:32 UTC

[kyuubi] branch master updated: [KYUUBI #4237] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new dcf863ad7 [KYUUBI #4237] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog
dcf863ad7 is described below

commit dcf863ad7c8ff5c179a225c136d3d4be46798eed
Author: jiaoqingbo <11...@qq.com>
AuthorDate: Fri Feb 3 05:41:22 2023 +0000

    [KYUUBI #4237] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog
    
    ### _Why are the changes needed?_
    
    fix #4222
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4237 from jiaoqingbo/kyuubi4222.
    
    Closes #4237
    
    7677d69f [jiaoqingbo] code review
    538d436a [jiaoqingbo] [Kyuubi #4222] Use hiveTableCatalog to updateTableStats instead of sessionCatalog
    
    Authored-by: jiaoqingbo <11...@qq.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../connector/hive/write/HiveBatchWrite.scala      | 22 ++++++++++++++++++----
 .../spark/connector/hive/write/HiveWrite.scala     |  2 ++
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
index ba89ec715..c4e473ff2 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala
@@ -33,11 +33,13 @@ import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog}
 import org.apache.spark.sql.types.StringType
 
-import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
+import org.apache.kyuubi.spark.connector.hive.{HiveTableCatalog, KyuubiHiveConnectorException}
 import org.apache.kyuubi.spark.connector.hive.write.HiveWriteHelper.getPartitionSpec
 
 class HiveBatchWrite(
+    sparkSession: SparkSession,
     table: CatalogTable,
+    hiveTableCatalog: HiveTableCatalog,
     tmpLocation: Option[Path],
     partition: Map[String, Option[String]],
     partitionColumnNames: Seq[String],
@@ -66,10 +68,22 @@ class HiveBatchWrite(
       deleteExternalTmpPath(hadoopConf)
     }
 
-    val sparkSession = SparkSession.active
     // un-cache this table.
-    CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString)
-    CommandUtils.updateTableStats(sparkSession, table)
+    hiveTableCatalog.catalog.invalidateCachedTable(table.identifier)
+
+    val catalog = hiveTableCatalog.catalog
+    if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
+      val newTable = catalog.getTableMetadata(table.identifier)
+      val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
+      val newStats = CatalogStatistics(sizeInBytes = newSize)
+      catalog.alterTableStats(table.identifier, Some(newStats))
+    } else if (table.stats.nonEmpty) {
+      catalog.alterTableStats(table.identifier, None)
+    } else {
+      // In other cases, we still need to invalidate the table relation cache.
+      catalog.refreshTable(table.identifier)
+    }
+
   }
 
   override def abort(messages: Array[WriterCommitMessage]): Unit = {
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
index 50ab2b5ed..486a7aa22 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala
@@ -102,7 +102,9 @@ case class HiveWrite(
     committer.setupJob(job)
 
     new HiveBatchWrite(
+      sparkSession,
       table,
+      hiveTableCatalog,
       Some(tmpLocation),
       partition,
       partitionColumnNames,