You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/04/27 15:05:42 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7443: Spark 3.4: Remove no longer needed write extensions

aokolnychyi commented on code in PR #7443:
URL: https://github.com/apache/iceberg/pull/7443#discussion_r1179305016


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala:
##########
@@ -22,100 +22,40 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.util.Optional
 import java.util.UUID
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression
-import org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
 import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
-import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.write.DeltaWriteBuilder
 import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
-import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite
-import org.apache.spark.sql.connector.write.SupportsOverwrite
-import org.apache.spark.sql.connector.write.SupportsTruncate
 import org.apache.spark.sql.connector.write.WriteBuilder
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.AlwaysTrue
-import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 /**
- * A rule that is inspired by V2Writes in Spark but supports Iceberg transforms.
+ * A rule that is inspired by V2Writes in Spark but supports Iceberg specific plans.
  */
 object ExtendedV2Writes extends Rule[LogicalPlan] with PredicateHelper {
 
   import DataSourceV2Implicits._
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
-    case a @ AppendData(r: DataSourceV2Relation, query, options, _, None, _) if isIcebergRelation(r) =>
-      val writeBuilder = newWriteBuilder(r.table, query.schema, options)
-      val write = writeBuilder.build()
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
-      a.copy(write = Some(write), query = newQuery)
-
-    case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None, _)
-        if isIcebergRelation(r) =>
-      // fail if any filter cannot be converted. correctness depends on removing all matching data.
-      val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred =>
-        val filter = DataSourceStrategy.translateFilter(pred, supportNestedPredicatePushdown = true)
-        if (filter.isEmpty) {
-          throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
-        }
-        filter
-      }.toArray
-
-      val table = r.table
-      val writeBuilder = newWriteBuilder(table, query.schema, options)
-      val write = writeBuilder match {
-        case builder: SupportsTruncate if isTruncate(filters) =>
-          builder.truncate().build()
-        case builder: SupportsOverwrite =>
-          builder.overwrite(filters).build()
-        case _ =>
-          throw QueryExecutionErrors.overwriteTableByUnsupportedExpressionError(table)
-      }
-
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
-      o.copy(write = Some(write), query = newQuery)
-
-    case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None)
-        if isIcebergRelation(r) =>
-      val table = r.table
-      val writeBuilder = newWriteBuilder(table, query.schema, options)
-      val write = writeBuilder match {
-        case builder: SupportsDynamicOverwrite =>
-          builder.overwriteDynamicPartitions().build()
-        case _ =>
-          throw QueryExecutionErrors.dynamicPartitionOverwriteUnsupportedByTableError(table)
-      }
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
-      o.copy(write = Some(write), query = newQuery)
-
     case rd @ ReplaceIcebergData(r: DataSourceV2Relation, query, _, None) =>

Review Comment:
   We have to keep plans for row-level operations for now as Spark plans don't support runtime filtering for UPDATE and MERGE. It will be part of Spark 3.5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org