You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "William Wong (JIRA)" <ji...@apache.org> on 2019/03/05 16:05:00 UTC

[jira] [Updated] (SPARK-27062) Refresh Table command register table with table name only

     [ https://issues.apache.org/jira/browse/SPARK-27062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

William Wong updated SPARK-27062:
---------------------------------
    Description: 
If CatalogImpl.refreshTable() method is invoked against a cached table, this method would first uncache corresponding query in the shared state cache manager, and then cache it back to refresh the cache copy. 

However, the table was recached with only 'table name'. The database name will be missed. Therefore, if cached table is not on the default database, the recreated cache may refer to a different table. For example, we may see the cached table name in driver's storage page will be changed after table refreshing. 

 

Here is related code on github for your reference. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala] 

 
{code:java}
override def refreshTable(tableName: String): Unit = {
  val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
  val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
  val table = sparkSession.table(tableIdent)

  if (tableMetadata.tableType == CatalogTableType.VIEW) {
    // Temp or persistent views: refresh (or invalidate) any metadata/data cached
    // in the plan recursively.
    table.queryExecution.analyzed.refresh()
  } else {
    // Non-temp tables: refresh the metadata cache.
    sessionCatalog.refreshTable(tableIdent)
  }

  // If this table is cached as an InMemoryRelation, drop the original
  // cached version and make the new version cached lazily.
  if (isCached(table)) {
    // Uncache the logicalPlan.
    sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, blocking = true)
    // Cache it again.
    sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
  }
}
{code}
 

 

In Spark SQL module, the database name is registered together with table name when "CACHE TABLE" command was executed. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala] 

and  CatalogImpl register cache with received table name. 
{code:java}
override def cacheTable(tableName: String): Unit = {    sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName)) }
{code}
 

Therefore, I would like to propose aligning the behavior. RefreshTable method should reuse the received table name instead. 

 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
{code}
to 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName))
 {code}
 

  was:
If CatalogImpl.refreshTable() method is invoked against a cached table, this method would first uncache corresponding query in the shared state cache manager, and then cache it back to refresh the cache copy. 

However, the table was recached with only 'table name'. The database name will be missed. Therefore, if cached table is not on the default database, the recreated cache may refer to a different table. For example, we may see the cached table name in driver's storage page will be changed after table refreshing. 

 

Here is related code on github for your reference. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala] 

 
{code:java}
override def refreshTable(tableName: String): Unit = {
  val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
  val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
  val table = sparkSession.table(tableIdent)

  if (tableMetadata.tableType == CatalogTableType.VIEW) {
    // Temp or persistent views: refresh (or invalidate) any metadata/data cached
    // in the plan recursively.
    table.queryExecution.analyzed.refresh()
  } else {
    // Non-temp tables: refresh the metadata cache.
    sessionCatalog.refreshTable(tableIdent)
  }

  // If this table is cached as an InMemoryRelation, drop the original
  // cached version and make the new version cached lazily.
  if (isCached(table)) {
    // Uncache the logicalPlan.
    sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, blocking = true)
    // Cache it again.
    sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
  }
}
{code}
 

 

In Spark SQL module, the database name is registered together with table name when "CACHE TABLE" command was executed. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala] 

 

 

Therefore, I would like to propose aligning the behavior. Full table name should also be used in RefreshTable case.  We should change the following line in CatalogImpl.refreshTable from 

 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
{code}
to

 

 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.quotedString))
 {code}
 


> Refresh Table command register table with table name only
> ---------------------------------------------------------
>
>                 Key: SPARK-27062
>                 URL: https://issues.apache.org/jira/browse/SPARK-27062
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2
>            Reporter: William Wong
>            Priority: Major
>              Labels: easyfix
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> If CatalogImpl.refreshTable() method is invoked against a cached table, this method would first uncache corresponding query in the shared state cache manager, and then cache it back to refresh the cache copy. 
> However, the table was recached with only 'table name'. The database name will be missed. Therefore, if cached table is not on the default database, the recreated cache may refer to a different table. For example, we may see the cached table name in driver's storage page will be changed after table refreshing. 
>  
> Here is related code on github for your reference. 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala] 
>  
> {code:java}
> override def refreshTable(tableName: String): Unit = {
>   val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
>   val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
>   val table = sparkSession.table(tableIdent)
>   if (tableMetadata.tableType == CatalogTableType.VIEW) {
>     // Temp or persistent views: refresh (or invalidate) any metadata/data cached
>     // in the plan recursively.
>     table.queryExecution.analyzed.refresh()
>   } else {
>     // Non-temp tables: refresh the metadata cache.
>     sessionCatalog.refreshTable(tableIdent)
>   }
>   // If this table is cached as an InMemoryRelation, drop the original
>   // cached version and make the new version cached lazily.
>   if (isCached(table)) {
>     // Uncache the logicalPlan.
>     sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, blocking = true)
>     // Cache it again.
>     sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
>   }
> }
> {code}
>  
>  
> In Spark SQL module, the database name is registered together with table name when "CACHE TABLE" command was executed. 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala] 
> and  CatalogImpl register cache with received table name. 
> {code:java}
> override def cacheTable(tableName: String): Unit = {    sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName)) }
> {code}
>  
> Therefore, I would like to propose aligning the behavior. RefreshTable method should reuse the received table name instead. 
>  
> {code:java}
> sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
> {code}
> to 
> {code:java}
> sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName))
>  {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org