You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/01/20 13:46:00 UTC

[spark] branch branch-3.2 updated: [SPARK-37963][SQL] Need to update Partition URI after renaming table in InMemoryCatalog

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 4a8e4cb  [SPARK-37963][SQL] Need to update Partition URI after renaming table in InMemoryCatalog
4a8e4cb is described below

commit 4a8e4cbd97fef8c3e298f1c3f264bb85f06c19e0
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Thu Jan 20 21:39:34 2022 +0800

    [SPARK-37963][SQL] Need to update Partition URI after renaming table in InMemoryCatalog
    
    ### What changes were proposed in this pull request?
    
    After renaming a partitioned table, select from the new table from InMemoryCatalog will get an empty result.
    
    The following checkAnswer will fail as the result is empty.
    ```
    sql(s"create table foo(i int, j int) using PARQUET partitioned by (j)")
    sql("insert into table foo partition(j=2) values (1)")
    sql(s"alter table foo rename to bar")
    checkAnswer(spark.table("bar"), Row(1, 2))
    ```
    To fix the bug, we need to update Partition URI after renaming a table in InMemoryCatalog
    ### Why are the changes needed?
    
    Bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, InMemoryCatalog is used internally and HMS doesn't have this bug.
    
    ### How was this patch tested?
    
    Unit test
    
    Closes #35251 from gengliangwang/fixAlterRename.
    
    Authored-by: Gengliang Wang <ge...@apache.org>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
    (cherry picked from commit 851eb280424777e0855310878609e764c3774977)
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala | 13 +++++++++++--
 .../sql/execution/command/AlterTableRenameSuiteBase.scala   | 10 ++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 7d29a9e..ef8085f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -50,7 +50,7 @@ class InMemoryCatalog(
   import CatalogTypes.TablePartitionSpec
 
   private class TableDesc(var table: CatalogTable) {
-    val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
+    var partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
   }
 
   private class DatabaseDesc(var db: CatalogDatabase) {
@@ -297,8 +297,17 @@ class InMemoryCatalog(
             oldName, newName, oldDir, e)
       }
       oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
-    }
 
+      val newPartitions = oldDesc.partitions.map { case (spec, partition) =>
+        val storage = partition.storage
+        val newLocationUri = storage.locationUri.map { uri =>
+          new Path(uri.toString.replace(oldDir.toString, newDir.toString)).toUri
+        }
+        val newPartition = partition.copy(storage = storage.copy(locationUri = newLocationUri))
+        (spec, newPartition)
+      }
+      oldDesc.partitions = newPartitions
+    }
     catalog(db).tables.put(newName, oldDesc)
     catalog(db).tables.remove(oldName)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenameSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenameSuiteBase.scala
index 6370939..1803ec0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenameSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenameSuiteBase.scala
@@ -126,4 +126,14 @@ trait AlterTableRenameSuiteBase extends QueryTest with DDLCommandTestUtils {
       spark.sessionState.catalogManager.reset()
     }
   }
+
+  test("SPARK-37963: preserve partition info") {
+    withNamespaceAndTable("ns", "dst_tbl") { dst =>
+      val src = dst.replace("dst", "src")
+      sql(s"CREATE TABLE $src (i int, j int) $defaultUsing partitioned by (j)")
+      sql(s"insert into table $src partition(j=2) values (1)")
+      sql(s"ALTER TABLE $src RENAME TO ns.dst_tbl")
+      checkAnswer(spark.table(dst), Row(1, 2))
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org