You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/04 18:27:10 UTC

[spark] branch master updated: [SPARK-33990][SQL][TESTS] Remove partition data by v2 `ALTER TABLE .. DROP PARTITION`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc3f226  [SPARK-33990][SQL][TESTS] Remove partition data by v2 `ALTER TABLE .. DROP PARTITION`
fc3f226 is described below

commit fc3f22645e5c542e80a086d96da384feb6afe121
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jan 4 10:26:39 2021 -0800

    [SPARK-33990][SQL][TESTS] Remove partition data by v2 `ALTER TABLE .. DROP PARTITION`
    
    ### What changes were proposed in this pull request?
    Remove partition data by `ALTER TABLE .. DROP PARTITION` in V2 table catalog used in tests.
    
    ### Why are the changes needed?
    This is a bug fix. Before the fix, `ALTER TABLE .. DROP PARTITION` does not remove the data belongs to the dropped partition. As a consequence of that, the `select` query returns removed data.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    By running tests suites for v1 and v2 catalogs:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite"
    ```
    
    Closes #31014 from MaxGekk/fix-drop-partition-v2.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/connector/InMemoryAtomicPartitionTable.scala  |  1 +
 .../apache/spark/sql/connector/InMemoryPartitionTable.scala |  1 +
 .../org/apache/spark/sql/connector/InMemoryTable.scala      |  4 ++++
 .../command/AlterTableDropPartitionSuiteBase.scala          | 13 ++++++++++++-
 4 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala
index c2a95cc..f313c6c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryAtomicPartitionTable.scala
@@ -49,6 +49,7 @@ class InMemoryAtomicPartitionTable (
   override def dropPartition(ident: InternalRow): Boolean = {
     if (memoryTablePartitions.containsKey(ident)) {
       memoryTablePartitions.remove(ident)
+      removePartitionKey(ident.toSeq(schema))
       true
     } else {
       false
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala
index a3d610a..9e3555b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryPartitionTable.scala
@@ -61,6 +61,7 @@ class InMemoryPartitionTable(
   def dropPartition(ident: InternalRow): Boolean = {
     if (memoryTablePartitions.containsKey(ident)) {
       memoryTablePartitions.remove(ident)
+      removePartitionKey(ident.toSeq(schema))
       true
     } else {
       false
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index 201d67a..a1253df 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -187,6 +187,10 @@ class InMemoryTable(
     true
   }
 
+  protected def removePartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
+    dataMap.remove(key)
+  }
+
   def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized {
     data.foreach(_.rows.foreach { row =>
       val key = getKey(row)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index cf8a1e9..d8a8920 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
 import org.apache.spark.sql.internal.SQLConf
 
@@ -144,4 +144,15 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
       checkPartitions(t)
     }
   }
+
+  test("SPARK-33990: don not return data from dropped partition") {
+    withNamespaceAndTable("ns", "tbl") { t =>
+      sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)")
+      sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
+      sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
+      QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(0, 0), Row(1, 1)))
+      sql(s"ALTER TABLE $t DROP PARTITION (part=0)")
+      QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 1)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org