You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/28 10:00:01 UTC

[spark] branch branch-3.1 updated: [SPARK-34262][SQL][3.1] Refresh cached data of v1 table in `ALTER TABLE .. SET LOCATION`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 451baad  [SPARK-34262][SQL][3.1] Refresh cached data of v1 table in `ALTER TABLE .. SET LOCATION`
451baad is described below

commit 451baad7816c655bb64e7179ec8994644eca18c5
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Thu Jan 28 18:59:34 2021 +0900

    [SPARK-34262][SQL][3.1] Refresh cached data of v1 table in `ALTER TABLE .. SET LOCATION`
    
    ### What changes were proposed in this pull request?
    Invoke `CatalogImpl.refreshTable()` in v1 implementation of the `ALTER TABLE .. SET LOCATION` command to refresh cached table data.
    
    ### Why are the changes needed?
    The example below portraits the issue:
    
    - Create a source table:
    ```sql
    spark-sql> CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part);
    spark-sql> INSERT INTO src_tbl PARTITION (part=0) SELECT 0;
    spark-sql> SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0);
    default	src_tbl	false	Partition Values: [part=0]
    Location: file:/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0
    ...
    ```
    - Set new location for the empty partition (part=0):
    ```sql
    spark-sql> CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part);
    spark-sql> ALTER TABLE dst_tbl ADD PARTITION (part=0);
    spark-sql> INSERT INTO dst_tbl PARTITION (part=1) SELECT 1;
    spark-sql> CACHE TABLE dst_tbl;
    spark-sql> SELECT * FROM dst_tbl;
    1	1
    spark-sql> ALTER TABLE dst_tbl PARTITION (part=0) SET LOCATION '/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0';
    spark-sql> SELECT * FROM dst_tbl;
    1	1
    ```
    The last query does not return new loaded data.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes, the example above works correctly:
    ```sql
    spark-sql> ALTER TABLE dst_tbl PARTITION (part=0) SET LOCATION '/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0';
    spark-sql> SELECT * FROM dst_tbl;
    0	0
    1	1
    ```
    
    ### How was this patch tested?
    Added new test to `org.apache.spark.sql.hive.CachedTableSuite`:
    ```
    $ build/sbt -Phive -Phive-thriftserver "test:testOnly *CachedTableSuite"
    ```
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: HyukjinKwon <gurwls223apache.org>
    (cherry picked from commit d242166b8fd741fdd46d9048f847b2fd6e1d07b1)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #31379 from MaxGekk/refresh-cache-set-location-3.1.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../apache/spark/sql/execution/command/ddl.scala   |  2 +-
 .../apache/spark/sql/hive/CachedTableSuite.scala   | 39 +++++++++++++++++-----
 2 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 94c03c5..2f96c45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -835,7 +835,7 @@ case class AlterTableSetLocationCommand(
         // No partition spec is specified, so we set the location for the table itself
         catalog.alterTable(table.withNewStorage(locationUri = Some(locUri)))
     }
-
+    sparkSession.catalog.refreshTable(table.identifier.quotedString)
     CommandUtils.updateTableStats(sparkSession, table)
     Seq.empty[Row]
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 2e58c0e..f9a4ff1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -552,28 +552,51 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     }
   }
 
+  private def getPartitionLocation(t: String, partition: String): String = {
+    val information = sql(s"SHOW TABLE EXTENDED LIKE '$t' PARTITION ($partition)")
+      .select("information")
+      .first().getString(0)
+    information
+      .split("\\r?\\n")
+      .filter(_.startsWith("Location:"))
+      .head
+      .replace("Location: file:", "")
+  }
+
   test("SPARK-34213: LOAD DATA refreshes cached table") {
     withTable("src_tbl") {
       withTable("dst_tbl") {
         sql("CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part)")
         sql("INSERT INTO src_tbl PARTITION (part=0) SELECT 0")
-        val information = sql("SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0)")
-          .select("information")
-          .first().getString(0)
-        val location = information
-          .split("\\r?\\n")
-          .filter(_.startsWith("Location:"))
-          .head
-          .replace("Location: file:", "")
         sql("CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part)")
         sql("INSERT INTO dst_tbl PARTITION (part=1) SELECT 1")
         sql("CACHE TABLE dst_tbl")
         assert(spark.catalog.isCached("dst_tbl"))
         checkAnswer(sql("SELECT * FROM dst_tbl"), Row(1, 1))
+        val location = getPartitionLocation("src_tbl", "part=0")
         sql(s"LOAD DATA LOCAL INPATH '$location' INTO TABLE dst_tbl PARTITION (part=0)")
         assert(spark.catalog.isCached("dst_tbl"))
         checkAnswer(sql("SELECT * FROM dst_tbl"), Seq(Row(0, 0), Row(1, 1)))
       }
     }
   }
+
+  test("SPARK-34262: ALTER TABLE .. SET LOCATION refreshes cached table") {
+    withTable("src_tbl") {
+      withTable("dst_tbl") {
+        sql("CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part)")
+        sql("INSERT INTO src_tbl PARTITION (part=0) SELECT 0")
+        sql("CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part)")
+        sql("ALTER TABLE dst_tbl ADD PARTITION (part=0)")
+        sql("INSERT INTO dst_tbl PARTITION (part=1) SELECT 1")
+        sql("CACHE TABLE dst_tbl")
+        assert(spark.catalog.isCached("dst_tbl"))
+        checkAnswer(sql("SELECT * FROM dst_tbl"), Row(1, 1))
+        val location = getPartitionLocation("src_tbl", "part=0")
+        sql(s"ALTER TABLE dst_tbl PARTITION (part=0) SET LOCATION '$location'")
+        assert(spark.catalog.isCached("dst_tbl"))
+        checkAnswer(sql("SELECT * FROM dst_tbl"), Seq(Row(0, 0), Row(1, 1)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org