You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "YannByron (via GitHub)" <gi...@apache.org> on 2023/03/23 02:26:56 UTC

[GitHub] [hudi] YannByron commented on a diff in pull request #7847: [HUDI-5697] Revisiting refreshing of Hudi relations after write operations on the tables

YannByron commented on code in PR #7847:
URL: https://github.com/apache/hudi/pull/7847#discussion_r1145600146


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalogUtils.scala:
##########
@@ -17,8 +17,76 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+
 /**
  * NOTE: Since support for [[TableCatalog]] was only added in Spark 3, this trait
  *       is going to be an empty one simply serving as a placeholder (for compatibility w/ Spark 2)
  */
 trait HoodieCatalogUtils {}
+
+object HoodieCatalogUtils {
+
+  /**
+   * Please check scala-doc for other overloaded [[refreshTable()]] operation
+   */
+  def refreshTable(spark: SparkSession, qualifiedTableName: String): Unit = {
+    val tableId = spark.sessionState.sqlParser.parseTableIdentifier(qualifiedTableName)
+    refreshTable(spark, tableId)
+  }
+
+  /**
+   * Refreshes metadata and flushes cached data (resolved [[LogicalPlan]] representation,
+   * already loaded [[InMemoryRelation]]) for the table identified by [[tableId]].
+   *
+   * This method is usually invoked at the end of the write operation to make sure cached
+   * data/metadata are synchronized with the state on storage.
+   *
+   * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+   *       This is borrowed from Spark 3.1.3 and modified to satisfy Hudi needs:
+   *          - Unlike Spark canonical implementation, in case of Hudi this method is invoked
+   *            after writes carried out via Spark DataSource integration as well and as such
+   *            in these cases data might actually be missing from the caches, therefore
+   *            actually re-triggering resolution phase (involving file-listing, etc) for the
+   *            first time
+   *          - Additionally, this method is modified to avoid refreshing [[LogicalRelation]]
+   *            completely to make sure that we're not re-triggering the file-listing of the
+   *            table, immediately after it's been written, instead deferring it to subsequent
+   *            read operation
+   */
+  def refreshTable(spark: SparkSession, tableId: TableIdentifier): Unit = {
+    val sessionCatalog = spark.sessionState.catalog
+    val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableId)
+
+    // Before proceeding we validate whether this table is actually cached w/in [[SessionCatalog]],
+    // since, for ex, in case of writing via Spark DataSource (V1) API, Spark wouldn't actually
+    // resort to caching the data
+    val cachedPlan = sessionCatalog.getCachedTable(
+      QualifiedTableName(tableId.database.getOrElse(tableMetadata.database), tableId.identifier))
+
+    if (cachedPlan != null) {
+      // NOTE: Provided that this table is still cached, following operation would not be
+      //       triggering subsequent resolution and listing of the table
+      val table = spark.table(tableId)
+
+      if (tableMetadata.tableType == CatalogTableType.VIEW) {

Review Comment:
   https://github.com/apache/spark/pull/31265 suggest not to invalidate view relation, so should we keep it and do nothing if `tableMetadata.tableType == CatalogTableType.VIEW`?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalogUtils.scala:
##########
@@ -17,8 +17,76 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+
 /**
  * NOTE: Since support for [[TableCatalog]] was only added in Spark 3, this trait
  *       is going to be an empty one simply serving as a placeholder (for compatibility w/ Spark 2)
  */
 trait HoodieCatalogUtils {}
+
+object HoodieCatalogUtils {
+
+  /**
+   * Please check scala-doc for other overloaded [[refreshTable()]] operation
+   */
+  def refreshTable(spark: SparkSession, qualifiedTableName: String): Unit = {
+    val tableId = spark.sessionState.sqlParser.parseTableIdentifier(qualifiedTableName)
+    refreshTable(spark, tableId)
+  }
+
+  /**
+   * Refreshes metadata and flushes cached data (resolved [[LogicalPlan]] representation,
+   * already loaded [[InMemoryRelation]]) for the table identified by [[tableId]].
+   *
+   * This method is usually invoked at the end of the write operation to make sure cached
+   * data/metadata are synchronized with the state on storage.
+   *
+   * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+   *       This is borrowed from Spark 3.1.3 and modified to satisfy Hudi needs:
+   *          - Unlike Spark canonical implementation, in case of Hudi this method is invoked
+   *            after writes carried out via Spark DataSource integration as well and as such
+   *            in these cases data might actually be missing from the caches, therefore
+   *            actually re-triggering resolution phase (involving file-listing, etc) for the
+   *            first time
+   *          - Additionally, this method is modified to avoid refreshing [[LogicalRelation]]
+   *            completely to make sure that we're not re-triggering the file-listing of the
+   *            table, immediately after it's been written, instead deferring it to subsequent
+   *            read operation
+   */
+  def refreshTable(spark: SparkSession, tableId: TableIdentifier): Unit = {
+    val sessionCatalog = spark.sessionState.catalog
+    val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableId)
+
+    // Before proceeding we validate whether this table is actually cached w/in [[SessionCatalog]],
+    // since, for ex, in case of writing via Spark DataSource (V1) API, Spark wouldn't actually
+    // resort to caching the data
+    val cachedPlan = sessionCatalog.getCachedTable(
+      QualifiedTableName(tableId.database.getOrElse(tableMetadata.database), tableId.identifier))
+
+    if (cachedPlan != null) {
+      // NOTE: Provided that this table is still cached, following operation would not be
+      //       triggering subsequent resolution and listing of the table
+      val table = spark.table(tableId)
+
+      if (tableMetadata.tableType == CatalogTableType.VIEW) {

Review Comment:
   https://github.com/apache/spark/pull/31265 suggests not to invalidate view relation, so should we keep it and do nothing if `tableMetadata.tableType == CatalogTableType.VIEW`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org