You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/11 00:36:45 UTC

[spark] branch branch-3.0 updated: [SPARK-34055][SQL][3.0] Refresh cache in `ALTER TABLE .. ADD PARTITION`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 471a089  [SPARK-34055][SQL][3.0] Refresh cache in `ALTER TABLE .. ADD PARTITION`
471a089 is described below

commit 471a0896198f878a1adc974e2c72da081001d964
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jan 11 09:36:16 2021 +0900

    [SPARK-34055][SQL][3.0] Refresh cache in `ALTER TABLE .. ADD PARTITION`
    
    ### What changes were proposed in this pull request?
    Invoke `refreshTable()` from `CatalogImpl` which refreshes the cache in v1 `ALTER TABLE .. ADD PARTITION`.
    
    ### Why are the changes needed?
    This fixes the issues portrayed by the example:
    ```sql
    spark-sql> create table tbl (col int, part int) using parquet partitioned by (part);
    spark-sql> insert into tbl partition (part=0) select 0;
    spark-sql> cache table tbl;
    spark-sql> select * from tbl;
    0	0
    spark-sql> show table extended like 'tbl' partition(part=0);
    default	tbl	false	Partition Values: [part=0]
    Location: file:/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0
    ...
    ```
    Create new partition by copying the existing one:
    ```
    $ cp -r /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0 /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1
    ```
    ```sql
    spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
    spark-sql> select * from tbl;
    0	0
    ```
    
    The last query must return `0	1` since it has been added by `ALTER TABLE .. ADD PARTITION`.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes for the example above:
    ```sql
    ...
    spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
    spark-sql> select * from tbl;
    0	0
    0	1
    ```
    
    ### How was this patch tested?
    By running the affected test suite:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
    ```
    
    Closes #31116 from MaxGekk/add-partition-refresh-cache-2-3.0.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../apache/spark/sql/execution/command/ddl.scala   |  1 +
 .../org/apache/spark/sql/CachedTableSuite.scala    | 31 ++++++++++++++++++++++
 .../apache/spark/sql/hive/CachedTableSuite.scala   | 29 ++++++++++++++++++++
 3 files changed, 61 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 11e695a..1380e1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -484,6 +484,7 @@ case class AlterTableAddPartitionCommand(
       catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
     }
 
+    sparkSession.catalog.refreshTable(table.identifier.quotedString)
     if (table.stats.nonEmpty) {
       if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
         val addedSize = CommandUtils.calculateTotalLocationSize(sparkSession, table.identifier,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index b535734..de8d595 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql
 
+import java.io.File
+
 import scala.collection.mutable.HashSet
 import scala.concurrent.duration._
 
+import org.apache.commons.io.FileUtils
+
 import org.apache.spark.CleanerListener
 import org.apache.spark.executor.DataReadMethod._
 import org.apache.spark.executor.DataReadMethod.DataReadMethod
@@ -1300,4 +1304,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
       QueryTest.checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 2), Row(1, 1)))
     }
   }
+
+  test("SPARK-34055: refresh cache in partition adding") {
+    withTable("t") {
+      sql("CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)")
+      sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+      assert(!spark.catalog.isCached("t"))
+      sql("CACHE TABLE t")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))
+
+      // Create new partition (part = 1) in the filesystem
+      val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
+        .select("information")
+        .first().getString(0)
+      val part0Loc = information
+        .split("\\r?\\n")
+        .filter(_.startsWith("Location:"))
+        .head
+        .replace("Location: file:", "")
+      val part1Loc = part0Loc.replace("part=0", "part=1")
+      FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
+
+      sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index dc909fd..26918c5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive
 
 import java.io.File
 
+import org.apache.commons.io.FileUtils
+
 import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -469,4 +471,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       QueryTest.checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 2), Row(1, 1)))
     }
   }
+
+  test("SPARK-34055: refresh cache in partition adding") {
+    withTable("t") {
+      sql("CREATE TABLE t (id int, part int) USING hive PARTITIONED BY (part)")
+      sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+      assert(!spark.catalog.isCached("t"))
+      sql("CACHE TABLE t")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))
+
+      // Create new partition (part = 1) in the filesystem
+      val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
+        .select("information")
+        .first().getString(0)
+      val part0Loc = information
+        .split("\\r?\\n")
+        .filter(_.startsWith("Location:"))
+        .head
+        .replace("Location: file:", "")
+      val part1Loc = part0Loc.replace("part=0", "part=1")
+      FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
+
+      sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org