You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2024/04/03 04:06:33 UTC
(paimon) branch master updated: [spark] Support DELETE for append table (#3140)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 329ca2b3c [spark] Support DELETE for append table (#3140)
329ca2b3c is described below
commit 329ca2b3c697b4fc97cc8d70a662c5e7c74944af
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Wed Apr 3 12:06:28 2024 +0800
[spark] Support DELETE for append table (#3140)
---
.../Compatibility.scala} | 19 ++--
.../catalyst/analysis/PaimonDeleteTable.scala | 18 +++-
.../paimon/spark/catalyst/Compatibility.scala} | 19 ++--
.../paimon/spark/catalyst/Compatibility.scala} | 19 ++--
.../catalyst/analysis/PaimonDeleteTable.scala | 17 ++-
.../spark/catalyst/analysis/RowLevelOp.scala | 2 +-
.../analysis/expressions/ExpressionHelper.scala | 11 ++
.../commands/DeleteFromPaimonTableCommand.scala | 118 ++++++++++++++-------
.../spark/commands/MergeIntoPaimonTable.scala | 7 +-
.../paimon/spark/commands/PaimonCommand.scala | 94 +++++++++++++++-
.../commands/PaimonTruncateTableCommand.scala | 2 +-
.../spark/commands/UpdatePaimonTableCommand.scala | 92 +++-------------
.../apache/paimon/spark/sources/PaimonSink.scala | 3 +-
.../paimon/spark/sql/DeleteFromTableTest.scala | 73 ++++++++++++-
14 files changed, 342 insertions(+), 152 deletions(-)
diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
similarity index 61%
copy from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
copy to paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b131384de..a86c9186b 100644
--- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -16,14 +16,19 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst
-import org.apache.paimon.spark.SparkTable
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+object Compatibility {
+
+ def createDataSourceV2ScanRelation(
+ relation: DataSourceV2Relation,
+ scan: Scan,
+ output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+ DataSourceV2ScanRelation(relation, scan, output)
+ }
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
- extends DeleteFromPaimonTableCommandBase {
- override def condition(): Expression = delete.condition.orNull
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
similarity index 66%
copy from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
copy to paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index f2800d742..7da432d56 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -19,10 +19,14 @@
package org.apache.paimon.spark.catalyst.analysis
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
+import scala.collection.JavaConverters._
+
object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
override val operation: RowLevelOp = Delete
@@ -32,7 +36,19 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
checkPaimonTable(table.getTable)
- DeleteFromPaimonTableCommand(table, d)
+ table.getTable match {
+ case paimonTable: FileStoreTable =>
+ val primaryKeys = paimonTable.primaryKeys().asScala
+ if (primaryKeys.isEmpty) {
+ condition.foreach(checkSubquery)
+ }
+
+ val relation = PaimonRelation.getPaimonRelation(d.table)
+ DeleteFromPaimonTableCommand(relation, paimonTable, condition.getOrElse(TrueLiteral))
+
+ case _ =>
+ throw new RuntimeException("Delete Operation is only supported for FileStoreTable.")
+ }
}
}
}
diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
similarity index 61%
copy from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
copy to paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b131384de..a86c9186b 100644
--- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -16,14 +16,19 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst
-import org.apache.paimon.spark.SparkTable
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+object Compatibility {
+
+ def createDataSourceV2ScanRelation(
+ relation: DataSourceV2Relation,
+ scan: Scan,
+ output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+ DataSourceV2ScanRelation(relation, scan, output)
+ }
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
- extends DeleteFromPaimonTableCommandBase {
- override def condition(): Expression = delete.condition.orNull
}
diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
similarity index 61%
rename from paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b131384de..a86c9186b 100644
--- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -16,14 +16,19 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst
-import org.apache.paimon.spark.SparkTable
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+object Compatibility {
+
+ def createDataSourceV2ScanRelation(
+ relation: DataSourceV2Relation,
+ scan: Scan,
+ output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
+ DataSourceV2ScanRelation(relation, scan, output)
+ }
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
- extends DeleteFromPaimonTableCommandBase {
- override def condition(): Expression = delete.condition.orNull
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index f2800d742..7b0f0cb64 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -19,10 +19,13 @@
package org.apache.paimon.spark.catalyst.analysis
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
+import scala.collection.JavaConverters._
+
object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
override val operation: RowLevelOp = Delete
@@ -32,7 +35,19 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
checkPaimonTable(table.getTable)
- DeleteFromPaimonTableCommand(table, d)
+ table.getTable match {
+ case paimonTable: FileStoreTable =>
+ val primaryKeys = paimonTable.primaryKeys().asScala
+ if (primaryKeys.isEmpty) {
+ checkSubquery(condition)
+ }
+
+ val relation = PaimonRelation.getPaimonRelation(d.table)
+ DeleteFromPaimonTableCommand(relation, paimonTable, condition)
+
+ case _ =>
+ throw new RuntimeException("Delete Operation is only supported for FileStoreTable.")
+ }
}
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
index f83cf91d8..2f5892a02 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
@@ -48,7 +48,7 @@ case object Delete extends RowLevelOp {
override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE)
- override val supportAppendOnlyTable: Boolean = false
+ override val supportAppendOnlyTable: Boolean = true
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 8657c70ad..4e7a8109b 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -112,6 +112,17 @@ trait ExpressionHelper extends PredicateHelper {
}
}
+ def splitPruePartitionAndOtherPredicates(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ resolver: Resolver): (Seq[Expression], Seq[Expression]) = {
+ splitConjunctivePredicates(condition)
+ .partition {
+ isPredicatePartitionColumnsOnly(_, partitionColumns, resolver) && !SubqueryExpression
+ .hasSubquery(condition)
+ }
+ }
+
def isPredicatePartitionColumnsOnly(
condition: Expression,
partitionColumns: Seq[String],
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 84922abbc..bd73c8fc5 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -18,77 +18,123 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
-import org.apache.paimon.spark.{InsertInto, SparkTable}
+import org.apache.paimon.spark.PaimonSplitScan
+import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.Utils.createDataset
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.functions.lit
import java.util.{Collections, UUID}
-import scala.util.control.NonFatal
+import scala.collection.JavaConverters._
+import scala.util.Try
-trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with PaimonCommand {
- self: DeleteFromPaimonTableCommand =>
- override def table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
+case class DeleteFromPaimonTableCommand(
+ relation: DataSourceV2Relation,
+ override val table: FileStoreTable,
+ condition: Expression)
+ extends PaimonLeafRunnableCommand
+ with PaimonCommand
+ with ExpressionHelper
+ with SupportsSubquery {
- private val relation = delete.table
+ private lazy val writer = PaimonSparkWriter(table)
- def condition(): Expression
+ override def run(sparkSession: SparkSession): Seq[Row] = {
- private lazy val (deletePredicate, forceDeleteByRows) =
+ val commit = table.store.newCommit(UUID.randomUUID.toString)
if (condition == null || condition == TrueLiteral) {
- (None, false)
+ commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
- try {
- (convertConditionToPaimonPredicate(condition(), relation.output, table.rowType()), false)
- } catch {
- case NonFatal(_) =>
- (None, true)
+ val (partitionCondition, otherCondition) = splitPruePartitionAndOtherPredicates(
+ condition,
+ table.partitionKeys().asScala,
+ sparkSession.sessionState.conf.resolver)
+
+ // TODO: provide another partition visitor to support more partition predicate.
+ val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
+ val partitionPredicate = if (partitionCondition.isEmpty) {
+ None
+ } else {
+ convertConditionToPaimonPredicate(
+ partitionCondition.reduce(And),
+ relation.output,
+ rowType,
+ ignoreFailure = true)
}
- }
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val commit = table.store.newCommit(UUID.randomUUID.toString)
+ // We do not have to scan table if the following three requirements are met:
+ // 1) no other predicate;
+ // 2) partition condition can convert to paimon predicate;
+ // 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor.
+ val forceDeleteByRows =
+ otherCondition.nonEmpty || partitionPredicate.isEmpty || !partitionPredicate.get.visit(
+ visitor)
- if (forceDeleteByRows) {
- deleteRowsByCondition(sparkSession)
- } else if (deletePredicate.isEmpty) {
- commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
- } else {
- val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
- if (deletePredicate.get.visit(visitor)) {
+ if (forceDeleteByRows) {
+ val commitMessages = if (withPrimaryKeys) {
+ performDeleteForPkTable(sparkSession)
+ } else {
+ performDeleteForNonPkTable(sparkSession)
+ }
+ writer.commit(commitMessages)
+ } else {
val dropPartitions = visitor.partitions()
commit.dropPartitions(
Collections.singletonList(dropPartitions),
BatchWriteBuilder.COMMIT_IDENTIFIER)
- } else {
- deleteRowsByCondition(sparkSession)
}
}
Seq.empty[Row]
}
- private def deleteRowsByCondition(sparkSession: SparkSession): Unit = {
+ def performDeleteForPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
val df = createDataset(sparkSession, Filter(condition, relation))
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
+ writer.write(df)
+ }
+
+ def performDeleteForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
+ // Step1: the candidate data splits which are filtered by Paimon Predicate.
+ val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
+ val fileNameToMeta = candidateFileMap(candidateDataSplits)
+
+ // Step2: extract out the exactly files, which must have at least one record to be updated.
+ val touchedFilePaths = findTouchedFiles(candidateDataSplits, condition, relation, sparkSession)
- WriteIntoPaimonTable(table, InsertInto, df, new Options()).run(sparkSession)
+ // Step3: the smallest range of data files that need to be rewritten.
+ val touchedFiles = touchedFilePaths.map {
+ file => fileNameToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file"))
+ }
+
+ // Step4: build a dataframe that contains the unchanged data, and write out them.
+ val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
+ val toRewriteScanRelation = Filter(
+ Not(condition),
+ Compatibility.createDataSourceV2ScanRelation(
+ relation,
+ PaimonSplitScan(table, touchedDataSplits),
+ relation.output))
+ val data = createDataset(sparkSession, toRewriteScanRelation)
+ val addCommitMessage = writer.write(data)
+
+ // Step5: convert the deleted files that need to be wrote to commit message.
+ val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+
+ addCommitMessage ++ deletedCommitMessage
}
-}
-case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
- extends DeleteFromPaimonTableCommandBase {
- override def condition(): Expression = delete.condition
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 2c33b2c35..ffd09dcdb 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -20,7 +20,6 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.spark.util.EncoderUtils
@@ -31,7 +30,7 @@ import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.Utils._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, BasePredicate, Expression, Literal, PredicateHelper, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, BasePredicate, Expression, Literal, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, Filter, InsertAction, LogicalPlan, MergeAction, UpdateAction}
@@ -48,9 +47,7 @@ case class MergeIntoPaimonTable(
notMatchedActions: Seq[MergeAction],
notMatchedBySourceActions: Seq[MergeAction])
extends PaimonLeafRunnableCommand
- with WithFileStoreTable
- with ExpressionHelper
- with PredicateHelper {
+ with PaimonCommand {
import MergeIntoPaimonTable._
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index ba404704b..6a9afa9c2 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -18,12 +18,30 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.index.IndexFileMeta
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, IndexIncrement}
+import org.apache.paimon.spark.{PaimonSplitScan, SparkFilterConverter}
+import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
+import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowType
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Utils.createDataset
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter => FilterLogicalNode}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
+import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
+import java.net.URI
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
@@ -68,4 +86,78 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
value.isInstanceOf[Filter]
}
+ /** Gets a relative path against the table path. */
+ protected def relativePath(absolutePath: String): String = {
+ val location = table.location().toUri
+ location.relativize(new URI(absolutePath)).toString
+ }
+
+ protected def findCandidateDataSplits(
+ condition: Expression,
+ output: Seq[Attribute]): Seq[DataSplit] = {
+ val snapshotReader = table.newSnapshotReader()
+ if (condition == TrueLiteral) {
+ val filter =
+ convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true)
+ filter.foreach(snapshotReader.withFilter)
+ }
+
+ snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
+ }
+
+ protected def findTouchedFiles(
+ candidateDataSplits: Seq[DataSplit],
+ condition: Expression,
+ relation: DataSourceV2Relation,
+ sparkSession: SparkSession): Array[String] = {
+ import sparkSession.implicits._
+
+ val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
+ val filteredRelation =
+ FilterLogicalNode(
+ condition,
+ Compatibility.createDataSourceV2ScanRelation(relation, scan, relation.output))
+ createDataset(sparkSession, filteredRelation)
+ .select(input_file_name())
+ .distinct()
+ .as[String]
+ .collect()
+ .map(relativePath)
+ }
+
+ protected def candidateFileMap(
+ candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = {
+ val totalBuckets = table.coreOptions().bucket()
+ val candidateDataFiles = candidateDataSplits
+ .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, totalBuckets))
+ val fileStorePathFactory = table.store().pathFactory()
+ candidateDataFiles
+ .map(file => (file.relativePath(fileStorePathFactory), file))
+ .toMap
+ }
+
+ protected def buildDeletedCommitMessage(
+ deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
+ deletedFiles
+ .groupBy(f => (f.partition, f.bucket))
+ .map {
+ case ((partition, bucket), files) =>
+ val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ new DataIncrement(
+ Collections.emptyList[DataFileMeta],
+ deletedDataFileMetas,
+ Collections.emptyList[DataFileMeta]),
+ new CompactIncrement(
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta]),
+ new IndexIncrement(Collections.emptyList[IndexFileMeta])
+ )
+ }
+ .toSeq
+ }
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
index e9125e3e6..9d501e11f 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -32,7 +32,7 @@ import scala.collection.JavaConverters._
case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec: TablePartitionSpec)
extends PaimonLeafRunnableCommand
- with PaimonCommand {
+ with WithFileStoreTable {
override def table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 1c9fef77c..fbfe01a3f 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -18,16 +18,12 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.index.IndexFileMeta
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, IndexIncrement}
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
-import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
-import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.types.RowKind
import org.apache.spark.sql.{Column, Row, SparkSession}
@@ -36,12 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, Project, SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
-import org.apache.spark.sql.functions.{input_file_name, lit}
-
-import java.net.URI
-import java.util.Collections
-
-import scala.collection.JavaConverters._
+import org.apache.spark.sql.functions.lit
case class UpdatePaimonTableCommand(
relation: DataSourceV2Relation,
@@ -84,40 +75,22 @@ case class UpdatePaimonTableCommand(
/** Update for table without primary keys */
private def performUpdateForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
// Step1: the candidate data splits which are filtered by Paimon Predicate.
- val candidateDataSplits = findCandidateDataSplits()
+ val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
+ val fileNameToMeta = candidateFileMap(candidateDataSplits)
val commitMessages = if (candidateDataSplits.isEmpty) {
// no data spilt need to be rewrote
logDebug("No file need to rerote. It's an empty Commit.")
Seq.empty[CommitMessage]
} else {
- import sparkSession.implicits._
-
- // Step2: extract out the exactly files, which must contain record to be updated.
- val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
- val filteredRelation =
- Filter(condition, DataSourceV2ScanRelation(relation, scan, relation.output))
- val touchedFilePaths = createDataset(sparkSession, filteredRelation)
- .select(input_file_name())
- .distinct()
- .as[String]
- .collect()
- .map(relativePath)
-
- // Step3: build a new list of data splits which compose of those files.
- // Those are expected to be the smallest range of data files that need to be rewritten.
- val totalBuckets = table.coreOptions().bucket()
- val candidateDataFiles = candidateDataSplits
- .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, totalBuckets))
- val fileStorePathFactory = table.store().pathFactory()
- val fileNameToMeta =
- candidateDataFiles
- .map(file => (file.relativePath(fileStorePathFactory), file))
- .toMap
- val touchedFiles: Array[SparkDataFileMeta] = touchedFilePaths.map {
+ // Step2: extract out the exactly files, which must have at least one record to delete.
+ val touchedFilePaths =
+ findTouchedFiles(candidateDataSplits, condition, relation, sparkSession)
+
+ // Step3: the smallest range of data files that need to be rewritten.
+ val touchedFiles = touchedFilePaths.map {
file => fileNameToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file"))
}
- val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
// Step4: build a dataframe that contains the unchanged and updated data, and write out them.
val columns = updateExpressions.zip(relation.output).map {
@@ -129,6 +102,7 @@ case class UpdatePaimonTableCommand(
}
new Column(updated).as(origin.name, origin.metadata)
}
+ val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
val toUpdateScanRelation = DataSourceV2ScanRelation(
relation,
PaimonSplitScan(table, touchedDataSplits),
@@ -136,52 +110,12 @@ case class UpdatePaimonTableCommand(
val data = createDataset(sparkSession, toUpdateScanRelation).select(columns: _*)
val addCommitMessage = writer.write(data)
- // Step5: convert the files that need to be wrote to commit message.
- val deletedCommitMessage = touchedFiles
- .groupBy(f => (f.partition, f.bucket))
- .map {
- case ((partition, bucket), files) =>
- val bb = files.map(_.dataFileMeta).toList.asJava
- val newFilesIncrement = new DataIncrement(
- Collections.emptyList[DataFileMeta],
- bb,
- Collections.emptyList[DataFileMeta])
- buildCommitMessage(
- new CommitMessageImpl(partition, bucket, newFilesIncrement, null, null))
- }
- .toSeq
+ // Step5: convert the deleted files that need to be wrote to commit message.
+ val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
addCommitMessage ++ deletedCommitMessage
}
commitMessages
}
- private def findCandidateDataSplits(): Seq[DataSplit] = {
- val snapshotReader = table.newSnapshotReader()
- if (condition != TrueLiteral) {
- val filter =
- convertConditionToPaimonPredicate(condition, relation.output, rowType, ignoreFailure = true)
- filter.foreach(snapshotReader.withFilter)
- }
-
- snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
- }
-
- /** Gets a relative path against the table path. */
- private def relativePath(absolutePath: String): String = {
- val location = table.location().toUri
- location.relativize(new URI(absolutePath)).toString
- }
-
- private def buildCommitMessage(o: CommitMessageImpl): CommitMessage = {
- new CommitMessageImpl(
- o.partition,
- o.bucket,
- o.newFilesIncrement,
- new CompactIncrement(
- Collections.emptyList[DataFileMeta],
- Collections.emptyList[DataFileMeta],
- Collections.emptyList[DataFileMeta]),
- new IndexIncrement(Collections.emptyList[IndexFileMeta]));
- }
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
index 78f31208d..59651cf1e 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -35,8 +35,7 @@ class PaimonSink(
outputMode: OutputMode,
options: Options)
extends Sink
- with SchemaHelper
- with PaimonCommand {
+ with SchemaHelper {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val saveMode = if (outputMode == OutputMode.Complete()) {
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 3646cd73d..40eb9b9ad 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -26,15 +26,80 @@ import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
- test(s"test delete from append only table") {
+ import testImplicits._
+
+ test(s"Paimon Delete: append-only table") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING)
|""".stripMargin)
- spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')
+ |""".stripMargin)
+
+ spark.sql("DELETE FROM T WHERE name = 'a'")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((2, "b", "2024"), (3, "c", "2025"), (4, "d", "2025")).toDF()
+ )
+
+ spark.sql("DELETE FROM T WHERE dt = '2025'")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((2, "b", "2024")).toDF()
+ )
+ }
+
+ test(s"Paimon Delete: append-only table with partition") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt)
+ |""".stripMargin)
+
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025'),
+ |(5, 'a', '2026'), (6, 'b', '2026'), (7, 'c', '2027'), (8, 'd', '2027')
+ |""".stripMargin)
+
+ spark.sql("DELETE FROM T WHERE name = 'a'")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq(
+ (2, "b", "2024"),
+ (3, "c", "2025"),
+ (4, "d", "2025"),
+ (6, "b", "2026"),
+ (7, "c", "2027"),
+ (8, "d", "2027")).toDF()
+ )
+
+ spark.sql("DELETE FROM T WHERE dt = '2025'")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((2, "b", "2024"), (6, "b", "2026"), (7, "c", "2027"), (8, "d", "2027")).toDF()
+ )
+
+ spark.sql("DELETE FROM T WHERE dt IN ('2026', '2027')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((2, "b", "2024")).toDF()
+ )
+ }
+
+ test("Paimon Delete: append-only table, condition contains subquery") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt)
+ |""".stripMargin)
+
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')
+ |""".stripMargin)
- assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name = 'a'"))
- .isInstanceOf(classOf[UnsupportedOperationException])
+ Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
+ assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id IN (SELECT * FROM updated_ids)"))
+ .hasMessageContaining("Subqueries are not supported")
}
CoreOptions.MergeEngine.values().foreach {