You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/04/03 04:06:33 UTC

(paimon) branch master updated: [spark] Support DELETE for append table (#3140)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 329ca2b3c [spark] Support DELETE for append table (#3140)
329ca2b3c is described below

commit 329ca2b3c697b4fc97cc8d70a662c5e7c74944af
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Wed Apr 3 12:06:28 2024 +0800

    [spark] Support DELETE for append table (#3140)
---
 .../Compatibility.scala}                           |  19 ++--
 .../catalyst/analysis/PaimonDeleteTable.scala      |  18 +++-
 .../paimon/spark/catalyst/Compatibility.scala}     |  19 ++--
 .../paimon/spark/catalyst/Compatibility.scala}     |  19 ++--
 .../catalyst/analysis/PaimonDeleteTable.scala      |  17 ++-
 .../spark/catalyst/analysis/RowLevelOp.scala       |   2 +-
 .../analysis/expressions/ExpressionHelper.scala    |  11 ++
 .../commands/DeleteFromPaimonTableCommand.scala    | 118 ++++++++++++++-------
 .../spark/commands/MergeIntoPaimonTable.scala      |   7 +-
 .../paimon/spark/commands/PaimonCommand.scala      |  94 +++++++++++++++-
 .../commands/PaimonTruncateTableCommand.scala      |   2 +-
 .../spark/commands/UpdatePaimonTableCommand.scala  |  92 +++-------------
 .../apache/paimon/spark/sources/PaimonSink.scala   |   3 +-
 .../paimon/spark/sql/DeleteFromTableTest.scala     |  73 ++++++++++++-
 14 files changed, 342 insertions(+), 152 deletions(-)

diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
similarity index 61%
copy from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
copy to paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b131384de..a86c9186b 100644
--- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -16,14 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst
 
-import org.apache.paimon.spark.SparkTable
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
 
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+object Compatibility {
+
+  def createDataSourceV2ScanRelation(
+      relation: DataSourceV2Relation,
+      scan: Scan,
+      output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+    DataSourceV2ScanRelation(relation, scan, output)
+  }
 
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
-  extends DeleteFromPaimonTableCommandBase {
-  override def condition(): Expression = delete.condition.orNull
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
similarity index 66%
copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
copy to paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index f2800d742..7da432d56 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -19,10 +19,14 @@
 package org.apache.paimon.spark.catalyst.analysis
 
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
 
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 
+import scala.collection.JavaConverters._
+
 object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
 
   override val operation: RowLevelOp = Delete
@@ -32,7 +36,19 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
       case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
         checkPaimonTable(table.getTable)
 
-        DeleteFromPaimonTableCommand(table, d)
+        table.getTable match {
+          case paimonTable: FileStoreTable =>
+            val primaryKeys = paimonTable.primaryKeys().asScala
+            if (primaryKeys.isEmpty) {
+              condition.foreach(checkSubquery)
+            }
+
+            val relation = PaimonRelation.getPaimonRelation(d.table)
+            DeleteFromPaimonTableCommand(relation, paimonTable, condition.getOrElse(TrueLiteral))
+
+          case _ =>
+            throw new RuntimeException("Delete Operation is only supported for FileStoreTable.")
+        }
     }
   }
 }
diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
similarity index 61%
copy from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
copy to paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b131384de..a86c9186b 100644
--- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -16,14 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst
 
-import org.apache.paimon.spark.SparkTable
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
 
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+object Compatibility {
+
+  def createDataSourceV2ScanRelation(
+      relation: DataSourceV2Relation,
+      scan: Scan,
+      output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+    DataSourceV2ScanRelation(relation, scan, output)
+  }
 
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
-  extends DeleteFromPaimonTableCommandBase {
-  override def condition(): Expression = delete.condition.orNull
 }
diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
similarity index 61%
rename from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b131384de..a86c9186b 100644
--- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -16,14 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst
 
-import org.apache.paimon.spark.SparkTable
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
 
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+object Compatibility {
+
+  def createDataSourceV2ScanRelation(
+      relation: DataSourceV2Relation,
+      scan: Scan,
+      output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+    DataSourceV2ScanRelation(relation, scan, output)
+  }
 
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
-  extends DeleteFromPaimonTableCommandBase {
-  override def condition(): Expression = delete.condition.orNull
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index f2800d742..7b0f0cb64 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -19,10 +19,13 @@
 package org.apache.paimon.spark.catalyst.analysis
 
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 
+import scala.collection.JavaConverters._
+
 object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
 
   override val operation: RowLevelOp = Delete
@@ -32,7 +35,19 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
       case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
         checkPaimonTable(table.getTable)
 
-        DeleteFromPaimonTableCommand(table, d)
+        table.getTable match {
+          case paimonTable: FileStoreTable =>
+            val primaryKeys = paimonTable.primaryKeys().asScala
+            if (primaryKeys.isEmpty) {
+              checkSubquery(condition)
+            }
+
+            val relation = PaimonRelation.getPaimonRelation(d.table)
+            DeleteFromPaimonTableCommand(relation, paimonTable, condition)
+
+          case _ =>
+            throw new RuntimeException("Delete Operation is only supported for FileStoreTable.")
+        }
     }
   }
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
index f83cf91d8..2f5892a02 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
@@ -48,7 +48,7 @@ case object Delete extends RowLevelOp {
 
   override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE)
 
-  override val supportAppendOnlyTable: Boolean = false
+  override val supportAppendOnlyTable: Boolean = true
 
 }
 
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 8657c70ad..4e7a8109b 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -112,6 +112,17 @@ trait ExpressionHelper extends PredicateHelper {
     }
   }
 
+  def splitPruePartitionAndOtherPredicates(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      resolver: Resolver): (Seq[Expression], Seq[Expression]) = {
+    splitConjunctivePredicates(condition)
+      .partition {
+        isPredicatePartitionColumnsOnly(_, partitionColumns, resolver) && !SubqueryExpression
+          .hasSubquery(condition)
+      }
+  }
+
   def isPredicatePartitionColumnsOnly(
       condition: Expression,
       partitionColumns: Seq[String],
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 84922abbc..bd73c8fc5 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -18,77 +18,123 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.options.Options
 import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
-import org.apache.paimon.spark.{InsertInto, SparkTable}
+import org.apache.paimon.spark.PaimonSplitScan
+import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
 import org.apache.paimon.types.RowKind
 
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.Utils.createDataset
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
 import org.apache.spark.sql.functions.lit
 
 import java.util.{Collections, UUID}
 
-import scala.util.control.NonFatal
+import scala.collection.JavaConverters._
+import scala.util.Try
 
-trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with PaimonCommand {
-  self: DeleteFromPaimonTableCommand =>
-  override def table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
+case class DeleteFromPaimonTableCommand(
+    relation: DataSourceV2Relation,
+    override val table: FileStoreTable,
+    condition: Expression)
+  extends PaimonLeafRunnableCommand
+  with PaimonCommand
+  with ExpressionHelper
+  with SupportsSubquery {
 
-  private val relation = delete.table
+  private lazy val writer = PaimonSparkWriter(table)
 
-  def condition(): Expression
+  override def run(sparkSession: SparkSession): Seq[Row] = {
 
-  private lazy val (deletePredicate, forceDeleteByRows) =
+    val commit = table.store.newCommit(UUID.randomUUID.toString)
     if (condition == null || condition == TrueLiteral) {
-      (None, false)
+      commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
     } else {
-      try {
-        (convertConditionToPaimonPredicate(condition(), relation.output, table.rowType()), false)
-      } catch {
-        case NonFatal(_) =>
-          (None, true)
+      val (partitionCondition, otherCondition) = splitPruePartitionAndOtherPredicates(
+        condition,
+        table.partitionKeys().asScala,
+        sparkSession.sessionState.conf.resolver)
+
+      // TODO: provide another partition visitor to support more partition predicate.
+      val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
+      val partitionPredicate = if (partitionCondition.isEmpty) {
+        None
+      } else {
+        convertConditionToPaimonPredicate(
+          partitionCondition.reduce(And),
+          relation.output,
+          rowType,
+          ignoreFailure = true)
       }
-    }
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val commit = table.store.newCommit(UUID.randomUUID.toString)
+      // We do not have to scan table if the following three requirements are met:
+      // 1) no other predicate;
+      // 2) partition condition can convert to paimon predicate;
+      // 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor.
+      val forceDeleteByRows =
+        otherCondition.nonEmpty || partitionPredicate.isEmpty || !partitionPredicate.get.visit(
+          visitor)
 
-    if (forceDeleteByRows) {
-      deleteRowsByCondition(sparkSession)
-    } else if (deletePredicate.isEmpty) {
-      commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
-    } else {
-      val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
-      if (deletePredicate.get.visit(visitor)) {
+      if (forceDeleteByRows) {
+        val commitMessages = if (withPrimaryKeys) {
+          performDeleteForPkTable(sparkSession)
+        } else {
+          performDeleteForNonPkTable(sparkSession)
+        }
+        writer.commit(commitMessages)
+      } else {
         val dropPartitions = visitor.partitions()
         commit.dropPartitions(
           Collections.singletonList(dropPartitions),
           BatchWriteBuilder.COMMIT_IDENTIFIER)
-      } else {
-        deleteRowsByCondition(sparkSession)
       }
     }
 
     Seq.empty[Row]
   }
 
-  private def deleteRowsByCondition(sparkSession: SparkSession): Unit = {
+  def performDeleteForPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
     val df = createDataset(sparkSession, Filter(condition, relation))
       .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
+    writer.write(df)
+  }
+
+  def performDeleteForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
+    // Step1: the candidate data splits which are filtered by Paimon Predicate.
+    val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
+    val fileNameToMeta = candidateFileMap(candidateDataSplits)
+
+    // Step2: extract out the exactly files, which must have at least one record to be updated.
+    val touchedFilePaths = findTouchedFiles(candidateDataSplits, condition, relation, sparkSession)
 
-    WriteIntoPaimonTable(table, InsertInto, df, new Options()).run(sparkSession)
+    // Step3: the smallest range of data files that need to be rewritten.
+    val touchedFiles = touchedFilePaths.map {
+      file => fileNameToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file"))
+    }
+
+    // Step4: build a dataframe that contains the unchanged data, and write out them.
+    val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
+    val toRewriteScanRelation = Filter(
+      Not(condition),
+      Compatibility.createDataSourceV2ScanRelation(
+        relation,
+        PaimonSplitScan(table, touchedDataSplits),
+        relation.output))
+    val data = createDataset(sparkSession, toRewriteScanRelation)
+    val addCommitMessage = writer.write(data)
+
+    // Step5: convert the deleted files that need to be wrote to commit message.
+    val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+
+    addCommitMessage ++ deletedCommitMessage
   }
-}
 
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
-  extends DeleteFromPaimonTableCommandBase {
-  override def condition(): Expression = delete.condition
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 2c33b2c35..ffd09dcdb 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -20,7 +20,6 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns
 import org.apache.paimon.spark.util.EncoderUtils
@@ -31,7 +30,7 @@ import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
 import org.apache.spark.sql.Utils._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, BasePredicate, Expression, Literal, PredicateHelper, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, BasePredicate, Expression, Literal, UnsafeProjection}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, Filter, InsertAction, LogicalPlan, MergeAction, UpdateAction}
@@ -48,9 +47,7 @@ case class MergeIntoPaimonTable(
     notMatchedActions: Seq[MergeAction],
     notMatchedBySourceActions: Seq[MergeAction])
   extends PaimonLeafRunnableCommand
-  with WithFileStoreTable
-  with ExpressionHelper
-  with PredicateHelper {
+  with PaimonCommand {
 
   import MergeIntoPaimonTable._
 
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index ba404704b..6a9afa9c2 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -18,12 +18,30 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.index.IndexFileMeta
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, IndexIncrement}
+import org.apache.paimon.spark.{PaimonSplitScan, SparkFilterConverter}
+import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
+import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowType
 
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Utils.createDataset
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter => FilterLogicalNode}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
+import org.apache.spark.sql.functions.input_file_name
 import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
 
+import java.net.URI
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
 /** Helper trait for all paimon commands. */
 trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
 
@@ -68,4 +86,78 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
     value.isInstanceOf[Filter]
   }
 
+  /** Gets a relative path against the table path. */
+  protected def relativePath(absolutePath: String): String = {
+    val location = table.location().toUri
+    location.relativize(new URI(absolutePath)).toString
+  }
+
+  protected def findCandidateDataSplits(
+      condition: Expression,
+      output: Seq[Attribute]): Seq[DataSplit] = {
+    val snapshotReader = table.newSnapshotReader()
+    if (condition == TrueLiteral) {
+      val filter =
+        convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true)
+      filter.foreach(snapshotReader.withFilter)
+    }
+
+    snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
+  }
+
+  protected def findTouchedFiles(
+      candidateDataSplits: Seq[DataSplit],
+      condition: Expression,
+      relation: DataSourceV2Relation,
+      sparkSession: SparkSession): Array[String] = {
+    import sparkSession.implicits._
+
+    val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
+    val filteredRelation =
+      FilterLogicalNode(
+        condition,
+        Compatibility.createDataSourceV2ScanRelation(relation, scan, relation.output))
+    createDataset(sparkSession, filteredRelation)
+      .select(input_file_name())
+      .distinct()
+      .as[String]
+      .collect()
+      .map(relativePath)
+  }
+
+  protected def candidateFileMap(
+      candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = {
+    val totalBuckets = table.coreOptions().bucket()
+    val candidateDataFiles = candidateDataSplits
+      .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, totalBuckets))
+    val fileStorePathFactory = table.store().pathFactory()
+    candidateDataFiles
+      .map(file => (file.relativePath(fileStorePathFactory), file))
+      .toMap
+  }
+
+  protected def buildDeletedCommitMessage(
+      deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
+    deletedFiles
+      .groupBy(f => (f.partition, f.bucket))
+      .map {
+        case ((partition, bucket), files) =>
+          val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+
+          new CommitMessageImpl(
+            partition,
+            bucket,
+            new DataIncrement(
+              Collections.emptyList[DataFileMeta],
+              deletedDataFileMetas,
+              Collections.emptyList[DataFileMeta]),
+            new CompactIncrement(
+              Collections.emptyList[DataFileMeta],
+              Collections.emptyList[DataFileMeta],
+              Collections.emptyList[DataFileMeta]),
+            new IndexIncrement(Collections.emptyList[IndexFileMeta])
+          )
+      }
+      .toSeq
+  }
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
index e9125e3e6..9d501e11f 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -32,7 +32,7 @@ import scala.collection.JavaConverters._
 
 case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec: TablePartitionSpec)
   extends PaimonLeafRunnableCommand
-  with PaimonCommand {
+  with WithFileStoreTable {
 
   override def table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
 
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 1c9fef77c..fbfe01a3f 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -18,16 +18,12 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.index.IndexFileMeta
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, IndexIncrement}
 import org.apache.paimon.spark.PaimonSplitScan
 import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
-import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
-import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.types.RowKind
 
 import org.apache.spark.sql.{Column, Row, SparkSession}
@@ -36,12 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, Project, SupportsSubquery}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
-import org.apache.spark.sql.functions.{input_file_name, lit}
-
-import java.net.URI
-import java.util.Collections
-
-import scala.collection.JavaConverters._
+import org.apache.spark.sql.functions.lit
 
 case class UpdatePaimonTableCommand(
     relation: DataSourceV2Relation,
@@ -84,40 +75,22 @@ case class UpdatePaimonTableCommand(
   /** Update for table without primary keys */
   private def performUpdateForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
     // Step1: the candidate data splits which are filtered by Paimon Predicate.
-    val candidateDataSplits = findCandidateDataSplits()
+    val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
+    val fileNameToMeta = candidateFileMap(candidateDataSplits)
 
     val commitMessages = if (candidateDataSplits.isEmpty) {
       // no data spilt need to be rewrote
       logDebug("No file need to rerote. It's an empty Commit.")
       Seq.empty[CommitMessage]
     } else {
-      import sparkSession.implicits._
-
-      // Step2: extract out the exactly files, which must contain record to be updated.
-      val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
-      val filteredRelation =
-        Filter(condition, DataSourceV2ScanRelation(relation, scan, relation.output))
-      val touchedFilePaths = createDataset(sparkSession, filteredRelation)
-        .select(input_file_name())
-        .distinct()
-        .as[String]
-        .collect()
-        .map(relativePath)
-
-      // Step3: build a new list of data splits which compose of those files.
-      // Those are expected to be the smallest range of data files that need to be rewritten.
-      val totalBuckets = table.coreOptions().bucket()
-      val candidateDataFiles = candidateDataSplits
-        .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, totalBuckets))
-      val fileStorePathFactory = table.store().pathFactory()
-      val fileNameToMeta =
-        candidateDataFiles
-          .map(file => (file.relativePath(fileStorePathFactory), file))
-          .toMap
-      val touchedFiles: Array[SparkDataFileMeta] = touchedFilePaths.map {
+      // Step2: extract out the exactly files, which must have at least one record to delete.
+      val touchedFilePaths =
+        findTouchedFiles(candidateDataSplits, condition, relation, sparkSession)
+
+      // Step3: the smallest range of data files that need to be rewritten.
+      val touchedFiles = touchedFilePaths.map {
         file => fileNameToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file"))
       }
-      val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
 
       // Step4: build a dataframe that contains the unchanged and updated data, and write out them.
       val columns = updateExpressions.zip(relation.output).map {
@@ -129,6 +102,7 @@ case class UpdatePaimonTableCommand(
           }
           new Column(updated).as(origin.name, origin.metadata)
       }
+      val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
       val toUpdateScanRelation = DataSourceV2ScanRelation(
         relation,
         PaimonSplitScan(table, touchedDataSplits),
@@ -136,52 +110,12 @@ case class UpdatePaimonTableCommand(
       val data = createDataset(sparkSession, toUpdateScanRelation).select(columns: _*)
       val addCommitMessage = writer.write(data)
 
-      // Step5: convert the files that need to be wrote to commit message.
-      val deletedCommitMessage = touchedFiles
-        .groupBy(f => (f.partition, f.bucket))
-        .map {
-          case ((partition, bucket), files) =>
-            val bb = files.map(_.dataFileMeta).toList.asJava
-            val newFilesIncrement = new DataIncrement(
-              Collections.emptyList[DataFileMeta],
-              bb,
-              Collections.emptyList[DataFileMeta])
-            buildCommitMessage(
-              new CommitMessageImpl(partition, bucket, newFilesIncrement, null, null))
-        }
-        .toSeq
+      // Step5: convert the deleted files that need to be wrote to commit message.
+      val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
 
       addCommitMessage ++ deletedCommitMessage
     }
     commitMessages
   }
 
-  private def findCandidateDataSplits(): Seq[DataSplit] = {
-    val snapshotReader = table.newSnapshotReader()
-    if (condition != TrueLiteral) {
-      val filter =
-        convertConditionToPaimonPredicate(condition, relation.output, rowType, ignoreFailure = true)
-      filter.foreach(snapshotReader.withFilter)
-    }
-
-    snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
-  }
-
-  /** Gets a relative path against the table path. */
-  private def relativePath(absolutePath: String): String = {
-    val location = table.location().toUri
-    location.relativize(new URI(absolutePath)).toString
-  }
-
-  private def buildCommitMessage(o: CommitMessageImpl): CommitMessage = {
-    new CommitMessageImpl(
-      o.partition,
-      o.bucket,
-      o.newFilesIncrement,
-      new CompactIncrement(
-        Collections.emptyList[DataFileMeta],
-        Collections.emptyList[DataFileMeta],
-        Collections.emptyList[DataFileMeta]),
-      new IndexIncrement(Collections.emptyList[IndexFileMeta]));
-  }
 }
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
index 78f31208d..59651cf1e 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -35,8 +35,7 @@ class PaimonSink(
     outputMode: OutputMode,
     options: Options)
   extends Sink
-  with SchemaHelper
-  with PaimonCommand {
+  with SchemaHelper {
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     val saveMode = if (outputMode == OutputMode.Complete()) {
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 3646cd73d..40eb9b9ad 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -26,15 +26,80 @@ import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 
 abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
 
-  test(s"test delete from append only table") {
+  import testImplicits._
+
+  test(s"Paimon Delete: append-only table") {
     spark.sql(s"""
                  |CREATE TABLE T (id INT, name STRING, dt STRING)
                  |""".stripMargin)
 
-    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')
+                |""".stripMargin)
+
+    spark.sql("DELETE FROM T WHERE name = 'a'")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((2, "b", "2024"), (3, "c", "2025"), (4, "d", "2025")).toDF()
+    )
+
+    spark.sql("DELETE FROM T WHERE dt = '2025'")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((2, "b", "2024")).toDF()
+    )
+  }
+
+  test(s"Paimon Delete: append-only table with partition") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt)
+                 |""".stripMargin)
+
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025'),
+                |(5, 'a', '2026'), (6, 'b', '2026'), (7, 'c', '2027'), (8, 'd', '2027')
+                |""".stripMargin)
+
+    spark.sql("DELETE FROM T WHERE name = 'a'")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq(
+        (2, "b", "2024"),
+        (3, "c", "2025"),
+        (4, "d", "2025"),
+        (6, "b", "2026"),
+        (7, "c", "2027"),
+        (8, "d", "2027")).toDF()
+    )
+
+    spark.sql("DELETE FROM T WHERE dt = '2025'")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((2, "b", "2024"), (6, "b", "2026"), (7, "c", "2027"), (8, "d", "2027")).toDF()
+    )
+
+    spark.sql("DELETE FROM T WHERE dt IN ('2026', '2027')")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((2, "b", "2024")).toDF()
+    )
+  }
+
+  test("Paimon Delete: append-only table, condition contains subquery") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt)
+                 |""".stripMargin)
+
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')
+                |""".stripMargin)
 
-    assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name = 'a'"))
-      .isInstanceOf(classOf[UnsupportedOperationException])
+    Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
+    assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id IN (SELECT * FROM updated_ids)"))
+      .hasMessageContaining("Subqueries are not supported")
   }
 
   CoreOptions.MergeEngine.values().foreach {