You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/04/27 15:05:42 UTC
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7443: Spark 3.4: Remove no longer needed write extensions
aokolnychyi commented on code in PR #7443:
URL: https://github.com/apache/iceberg/pull/7443#discussion_r1179305016
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala:
##########
@@ -22,100 +22,40 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.Optional
import java.util.UUID
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression
-import org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
-import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.write.DeltaWriteBuilder
import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
-import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite
-import org.apache.spark.sql.connector.write.SupportsOverwrite
-import org.apache.spark.sql.connector.write.SupportsTruncate
import org.apache.spark.sql.connector.write.WriteBuilder
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.AlwaysTrue
-import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
/**
- * A rule that is inspired by V2Writes in Spark but supports Iceberg transforms.
+ * A rule that is inspired by V2Writes in Spark but supports Iceberg specific plans.
*/
object ExtendedV2Writes extends Rule[LogicalPlan] with PredicateHelper {
import DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
- case a @ AppendData(r: DataSourceV2Relation, query, options, _, None, _) if isIcebergRelation(r) =>
- val writeBuilder = newWriteBuilder(r.table, query.schema, options)
- val write = writeBuilder.build()
- val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
- a.copy(write = Some(write), query = newQuery)
-
- case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None, _)
- if isIcebergRelation(r) =>
- // fail if any filter cannot be converted. correctness depends on removing all matching data.
- val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred =>
- val filter = DataSourceStrategy.translateFilter(pred, supportNestedPredicatePushdown = true)
- if (filter.isEmpty) {
- throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
- }
- filter
- }.toArray
-
- val table = r.table
- val writeBuilder = newWriteBuilder(table, query.schema, options)
- val write = writeBuilder match {
- case builder: SupportsTruncate if isTruncate(filters) =>
- builder.truncate().build()
- case builder: SupportsOverwrite =>
- builder.overwrite(filters).build()
- case _ =>
- throw QueryExecutionErrors.overwriteTableByUnsupportedExpressionError(table)
- }
-
- val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
- o.copy(write = Some(write), query = newQuery)
-
- case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None)
- if isIcebergRelation(r) =>
- val table = r.table
- val writeBuilder = newWriteBuilder(table, query.schema, options)
- val write = writeBuilder match {
- case builder: SupportsDynamicOverwrite =>
- builder.overwriteDynamicPartitions().build()
- case _ =>
- throw QueryExecutionErrors.dynamicPartitionOverwriteUnsupportedByTableError(table)
- }
- val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
- o.copy(write = Some(write), query = newQuery)
-
case rd @ ReplaceIcebergData(r: DataSourceV2Relation, query, _, None) =>
Review Comment:
We have to keep plans for row-level operations for now as Spark plans don't support runtime filtering for UPDATE and MERGE. It will be part of Spark 3.5.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org