You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/04 21:34:25 UTC

[spark] branch branch-3.0 updated: [SPARK-33950][SQL][3.1][3.0] Refresh cache in v1 `ALTER TABLE .. DROP PARTITION`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e882c90  [SPARK-33950][SQL][3.1][3.0] Refresh cache in v1 `ALTER TABLE .. DROP PARTITION`
e882c90 is described below

commit e882c9058fa010f133cd794c114c67cfd4541186
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jan 4 13:33:41 2021 -0800

    [SPARK-33950][SQL][3.1][3.0] Refresh cache in v1 `ALTER TABLE .. DROP PARTITION`
    
    ### What changes were proposed in this pull request?
    Invoke `refreshTable()` from `AlterTableDropPartitionCommand.run()` after partitions dropping. In particular, this re-creates the cache associated with the modified table.
    
    ### Why are the changes needed?
    This fixes the issues portrayed by the example:
    ```sql
    spark-sql> CREATE TABLE tbl1 (col0 int, part0 int) USING parquet PARTITIONED BY (part0);
    spark-sql> INSERT INTO tbl1 PARTITION (part0=0) SELECT 0;
    spark-sql> INSERT INTO tbl1 PARTITION (part0=1) SELECT 1;
    spark-sql> CACHE TABLE tbl1;
    spark-sql> SELECT * FROM tbl1;
    0	0
    1	1
    spark-sql> ALTER TABLE tbl1 DROP PARTITION (part0=0);
    spark-sql> SELECT * FROM tbl1;
    0	0
    1	1
    ```
    The last query must not return `0	0` since it was deleted by previous command.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes for the example above:
    ```sql
    ...
    spark-sql> ALTER TABLE tbl1 DROP PARTITION (part0=0);
    spark-sql> SELECT * FROM tbl1;
    1	1
    ```
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
    ```
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    (cherry picked from commit 67195d0d977caa5a458e8a609c434205f9b54d1b)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #31006 from MaxGekk/drop-partition-refresh-cache-3.1.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit eef0e4c0389c815e8df25fd2c23a73fd85e20029)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/execution/command/ddl.scala    |  1 +
 .../scala/org/apache/spark/sql/CachedTableSuite.scala   | 15 +++++++++++++++
 .../org/apache/spark/sql/hive/CachedTableSuite.scala    | 17 ++++++++++++++++-
 3 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index f41c4ec..748bb1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -580,6 +580,7 @@ case class AlterTableDropPartitionCommand(
       table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
       retainData = retainData)
 
+    sparkSession.catalog.refreshTable(table.identifier.quotedString)
     CommandUtils.updateTableStats(sparkSession, table)
 
     Seq.empty[Row]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e14c72e..0e8122e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -1270,4 +1270,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
       }
     }
   }
+
+  test("SPARK-33950: refresh cache after partition dropping") {
+    withTable("t") {
+      sql(s"CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)")
+      sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+      sql("INSERT INTO t PARTITION (part=1) SELECT 1")
+      assert(!spark.catalog.isCached("t"))
+      sql("CACHE TABLE t")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(1, 1)))
+      sql("ALTER TABLE t DROP PARTITION (part=0)")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 1)))
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 6c0ab2f..ed3ccb4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
 
 import java.io.File
 
-import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
@@ -439,4 +439,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       assert(spark.catalog.isCached(t))
     }
   }
+
+  test("SPARK-33950: refresh cache after partition dropping") {
+    withTable("t") {
+      sql(s"CREATE TABLE t (id int, part int) USING hive PARTITIONED BY (part)")
+      sql("INSERT INTO t PARTITION (part=0) SELECT 0")
+      sql("INSERT INTO t PARTITION (part=1) SELECT 1")
+      assert(!spark.catalog.isCached("t"))
+      sql("CACHE TABLE t")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(1, 1)))
+      sql("ALTER TABLE t DROP PARTITION (part=0)")
+      assert(spark.catalog.isCached("t"))
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 1)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org