You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/31 21:30:15 UTC

[GitHub] [iceberg] rdblue opened a new pull request #2017: Move RewriteDelete to the analyzer

rdblue opened a new pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017


   This is a work-in-progress that moves the `RewriteDelete` rule from the optimizer to the analyzer. The analyzer is a better fit for this so that more can be delegated to existing rules in Spark.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r550995957



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(

Review comment:
       Extending case classes is tricky.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-753906176


   I have mixed feelings about this direction.
   
   On one hand, it is great to rewrite this early in the analyzer and leave the optimizer untouched. On the other hand, it prohibits optimizations on delete/update/merge nodes and makes certain rewrites more complicated.
   
   To give an example where having merge nodes in the optimizer may be helpful.
   
   If we rewrite the following operation in the analyzer, we will have to use a full outer join.
   
   ```
   MERGE target t USING source s
   ON t.id = s.id
   WHEN MATCHED AND exp_equivalent_to_false
     UPDATE SET …
   WHEN NOT MATCHED
     THEN INSERT
   ```
   
   However, the optimizer can detect that our UPDATE condition always evaluates to false, so it can simplify the plan:
   
   ```
   MERGE target t USING source s
   ON t.id = s.id
   WHEN NOT MATCHED
     THEN INSERT
   ```
   
   The optimized merge operation can be executed using a left anti join instead of a full outer join. Similarly, if we can get rid of the NOT MATCHED clause, we can use a right outer join instead of a full outer join. There may be more cases like this that we don't know yet.
   
   Also, there are a lot of rewrites that happen in the optimizer like @dilipbiswal mentioned so the current approach isn't that bad in my view. In addition, row-level ops are a bit different compared to the refresh of materialized views which is replaced with INSERTs. There is a clear translation for refresh statements and it does not require any optimization. Update/delete/merge commands are more complicated to me and I'll be fine addressing them differently.
   
   If we rewrite the plans after operator optimizations as we planned earlier, we miss these rules:
   - `PullupCorrelatedPredicates` (fixed in Spark 3.1 to cover delete/update/merge)
   - `OptimizeSubqueries` (works fine now as it runs on `SubqueryExpression` which is a child of the row-level plans)
   - Replacement of certain expressions with equivalent ones
   - Operator optimization rules (we match the behavior for subqueries)
   
   I think it is fine to run the rewrite after those rules.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-753422447


   @rdblue I see. I feel that we should have ability to inject the rules with a little fine grained control (we seem to lack at this point). There was a pr https://github.com/apache/spark/pull/23206 that was proposed by Sunitha a while back which unfortunately didn't take off. If we had this ability then we could run the delete rewrite by specifying the order. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551262767



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {

Review comment:
       We should be careful as `MergeTable` won't extend all interfaces the original table implements. Some of them are optional like `SupportsMetadataColumns`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-753921549


   That being said, my opinion isn't final and I do support the intention of reducing the scope of the rewrite rule (e.g. moving the pushdown logic).
   
   I also like the concept of `MergeTable`. Should we also consider using it in the optimizer?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-753921549


   That being said, I do support the intention of reducing the scope of the rewrite rule (e.g. moving the pushdown logic).
   
   I also like the concept of `MergeTable`. Should we also consider using it in the optimizer?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551262767



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {

Review comment:
       We should be careful as `MergeTable` won't extend all interfaces the original table implements. Some of them are optional like `SupportsMetadataColumns`. Some rules won't be triggered.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {

Review comment:
       We should be careful as `MergeTable` won't extend all interfaces the original table implements. Some of them are optional like `SupportsMetadataColumns`. Some rules may not be triggered.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-754277810


   > It prohibits optimizations on delete/update/merge nodes and makes certain rewrites more complicated.
   
   I don't agree that those optimizations are prohibited. I think they are still possible, it would just make the implementation different. Your example would initially be converted to a full join producing rows for a merge node. But we could write a separate optimizer rule that matches a merge with a full join and checks whether the join type can be changed. Instead of doing the entire task in a single rewrite rule, we would have a rule to rewrite to the full join in the analyzer, then one to prune cases with `false` conditions, and then one to rewrite the join.
   
   I think that decomposing this into smaller independent rules is a good thing.
   
   > If we rewrite the plans after operator optimizations as we planned earlier, we miss ...
   
   We would also miss analyzer rules. In our Spark build, we have support for metadata columns and that's how we project `_file` and `_pos`. The problem is that the analyzer rule to add metadata columns to the logical plan runs in the analyzer, so we have to add them to the projection in the optimizer rewrite rule. I think that there are quite a few tasks and optimizations that have to be done in this rule because it needs to run late.
   
   We're accumulating quite a list of rules that won't run on these plans by doing the rewrite in the optimizer. And, we lose more depending on what batch we use in the optimizer. If the rewrite is done in operator optimization, then there's no guarantee that expressions are fully optimized before this rule -- that could be a problem for the expr-equivalent-to-false example. On the other hand, expressions produced by the rule would get optimized, unless we move it after operator optimization. If we do that, then the `Not(cond)` that we introduce might be `Not(Not(...))` or other cases that would be simplified by the optimizer, and columns in the dynamic filter plan are not pruned.
   
   I think overall I'm leaning toward rewriting in the analyzer. I think we can still build the same optimizations, just in different ways. That said, we probably don't need to really consider this until after the next release. Getting something working is the first step. Then we can talk about refactoring into separate rules.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r552114587



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {
+    val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation, newWriteInfo(table.schema))
+
+    override def name: String = table.name
+    override def schema: StructType = table.schema
+    override def partitioning: Array[Transform] = table.partitioning
+    override def properties: util.Map[String, String] = table.properties
+    override def capabilities: util.Set[TableCapability] = table.capabilities
+    override def toString: String = table.toString
+
+    // TODO: refactor merge builder to accept options and info after construction
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = mergeBuilder.asScanBuilder()
+    override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = mergeBuilder.asWriteBuilder()
+
+    private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+      val uuid = UUID.randomUUID()
+      LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+    }
+  }
 
-    // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
-      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
-      val writeInfo = newWriteInfo(r.schema)
-      val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
+  private class DeletableMergeTable(
+      table: Table with SupportsMerge with ExtendedSupportsDelete,
+      operation: String) extends MergeTable(table, operation) with ExtendedSupportsDelete {
+    override def canDeleteWhere(filters: Array[sources.Filter]): Boolean = table.canDeleteWhere(filters)
+    override def deleteWhere(filters: Array[sources.Filter]): Unit = table.deleteWhere(filters)
+  }
 
-      val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case DeleteFromTable(r, Some(cond)) =>
+      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
+      val relation = r.collectFirst {
+        case v2: DataSourceV2Relation =>
+          v2
+      }.get
+
+      val mergeTable = relation.table match {
+        case withDelete: Table with SupportsMerge with ExtendedSupportsDelete =>
+          new DeletableMergeTable(withDelete, "delete")
+        case _ =>
+          MergeTable(relation.table.asMergeable, "delete")
+      }
+      val scanPlan = buildScanPlan(mergeTable, relation, cond)
 
-      val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
+      val remainingRowFilter = not(cond)
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
 
-      val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-      val writePlan = buildWritePlan(remainingRowsPlan, r.output)
-      ReplaceData(r, mergeWrite, writePlan)
+      val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+      val writeRelation = relation.copy(table = mergeTable, output = addFileAndPos(relation.output))
+
+      if (SubqueryExpression.hasSubquery(cond)) {
+        DeleteFrom(writeRelation, None, writePlan, None)
+      } else {
+        DeleteFrom(writeRelation, Some(cond), writePlan, None)
+      }
   }
 
   private def buildScanPlan(
-      table: Table,
-      output: Seq[AttributeReference],
-      mergeBuilder: MergeBuilder,
+      mergeTable: MergeTable,
+      tableRelation: DataSourceV2Relation,
       cond: Expression): LogicalPlan = {
+    val mergeRelation = tableRelation.copy(table = mergeTable, output = addFileAndPos(tableRelation.output))
 
-    val scanBuilder = mergeBuilder.asScanBuilder
-
-    val predicates = splitConjunctivePredicates(cond)
-    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
-    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
-    val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
-
-    scan match {
+    mergeTable.mergeBuilder match {

Review comment:
       Yes. Sorry I missed replying to this yesterday. Because we are creating two separate relations, we get two separate scans.
   
   I'm not sure that we can or should avoid this in Spark. We are already violating assumptions about the plan (that scans aren't reused) and I don't really see a workable way to create a scan using the normal early pushdown rule and use it in both places. We could probably work it out in Iceberg though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551059701



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {
+    val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation, newWriteInfo(table.schema))
+
+    override def name: String = table.name
+    override def schema: StructType = table.schema
+    override def partitioning: Array[Transform] = table.partitioning
+    override def properties: util.Map[String, String] = table.properties
+    override def capabilities: util.Set[TableCapability] = table.capabilities
+    override def toString: String = table.toString
+
+    // TODO: refactor merge builder to accept options and info after construction
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = mergeBuilder.asScanBuilder()
+    override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = mergeBuilder.asWriteBuilder()
+
+    private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+      val uuid = UUID.randomUUID()
+      LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+    }
+  }
 
-    // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
-      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
-      val writeInfo = newWriteInfo(r.schema)
-      val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
+  private class DeletableMergeTable(
+      table: Table with SupportsMerge with ExtendedSupportsDelete,
+      operation: String) extends MergeTable(table, operation) with ExtendedSupportsDelete {
+    override def canDeleteWhere(filters: Array[sources.Filter]): Boolean = table.canDeleteWhere(filters)
+    override def deleteWhere(filters: Array[sources.Filter]): Unit = table.deleteWhere(filters)
+  }
 
-      val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case DeleteFromTable(r, Some(cond)) =>
+      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
+      val relation = r.collectFirst {
+        case v2: DataSourceV2Relation =>
+          v2
+      }.get
+
+      val mergeTable = relation.table match {
+        case withDelete: Table with SupportsMerge with ExtendedSupportsDelete =>
+          new DeletableMergeTable(withDelete, "delete")
+        case _ =>
+          MergeTable(relation.table.asMergeable, "delete")
+      }
+      val scanPlan = buildScanPlan(mergeTable, relation, cond)
 
-      val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
+      val remainingRowFilter = not(cond)
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
 
-      val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-      val writePlan = buildWritePlan(remainingRowsPlan, r.output)
-      ReplaceData(r, mergeWrite, writePlan)
+      val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+      val writeRelation = relation.copy(table = mergeTable, output = addFileAndPos(relation.output))
+
+      if (SubqueryExpression.hasSubquery(cond)) {
+        DeleteFrom(writeRelation, None, writePlan, None)
+      } else {
+        DeleteFrom(writeRelation, Some(cond), writePlan, None)
+      }
   }
 
   private def buildScanPlan(
-      table: Table,
-      output: Seq[AttributeReference],
-      mergeBuilder: MergeBuilder,
+      mergeTable: MergeTable,
+      tableRelation: DataSourceV2Relation,
       cond: Expression): LogicalPlan = {
+    val mergeRelation = tableRelation.copy(table = mergeTable, output = addFileAndPos(tableRelation.output))
 
-    val scanBuilder = mergeBuilder.asScanBuilder
-
-    val predicates = splitConjunctivePredicates(cond)
-    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
-    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
-    val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
-
-    scan match {
+    mergeTable.mergeBuilder match {
       case filterable: SupportsFileFilter =>
-        val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
-        val dynamicFileFilter = DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
-        dynamicFileFilter
+        val filterRelation = tableRelation.copy(output = addFileAndPos(tableRelation.output))
+        val filteredFiles = new FilterFiles()
+        val matchingFilePlan = buildFileFilterPlan(cond, filterRelation)
+        filterable.filterFiles(filteredFiles)
+        DynamicFileFilter(mergeRelation, matchingFilePlan, filteredFiles)
+
       case _ =>
-        scanRelation
+        mergeRelation
     }
   }
 
   private def buildWritePlan(
       remainingRowsPlan: LogicalPlan,
       output: Seq[AttributeReference]): LogicalPlan = {
 
-    val fileNameCol = findOutputAttr(remainingRowsPlan, FILE_NAME_COL)
-    val rowPosCol = findOutputAttr(remainingRowsPlan, ROW_POS_COL)
+    val fileNameCol = UnresolvedAttribute(FILE_NAME_COL)
+    val rowPosCol = UnresolvedAttribute(ROW_POS_COL)
     val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, Ascending))
     val numShufflePartitions = SQLConf.get.numShufflePartitions
     val repartition = RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, numShufflePartitions)
     val sort = Sort(order, global = false, repartition)
     Project(output, sort)
   }
 
-  private def isMetadataDelete(relation: DataSourceV2Relation, cond: Expression): Boolean = {
-    relation.table match {
-      case t: ExtendedSupportsDelete if !SubqueryExpression.hasSubquery(cond) =>
-        val predicates = splitConjunctivePredicates(cond)
-        val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, relation.output)
-        val dataSourceFilters = toDataSourceFilters(normalizedPredicates)
-        val allPredicatesTranslated = normalizedPredicates.size == dataSourceFilters.length
-        allPredicatesTranslated && t.canDeleteWhere(dataSourceFilters)
-      case _ => false
-    }
-  }
-
-  private def toDataSourceFilters(predicates: Seq[Expression]): Array[sources.Filter] = {
-    predicates.flatMap { p =>
-      val translatedFilter = DataSourceStrategy.translateFilter(p, supportNestedPredicatePushdown = true)
-      if (translatedFilter.isEmpty) {
-        logWarning(s"Cannot translate expression to source filter: $p")
-      }
-      translatedFilter
-    }.toArray
-  }
-
-  private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
-    val uuid = UUID.randomUUID()
-    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
-  }
-
-  private def buildFileFilterPlan(cond: Expression, scanRelation: DataSourceV2ScanRelation): LogicalPlan = {
+  private def buildFileFilterPlan(cond: Expression, scanRelation: DataSourceV2Relation): LogicalPlan = {
     val matchingFilter = Filter(cond, scanRelation)
-    val fileAttr = findOutputAttr(matchingFilter, FILE_NAME_COL)
+    val fileAttr = UnresolvedAttribute(FILE_NAME_COL)
     val agg = Aggregate(Seq(fileAttr), Seq(fileAttr), matchingFilter)
-    Project(Seq(findOutputAttr(agg, FILE_NAME_COL)), agg)
+    Project(Seq(UnresolvedAttribute(FILE_NAME_COL)), agg)
   }
 
-  private def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
-    val resolver = SQLConf.get.resolver
-    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
-      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+  private def not(expr: Expression): Expression = {

Review comment:
       To get some of the tests working. There's a check that there are no NOT IN or NOT EXISTS subqueries in complex expressions, and things like `Not(Not(Exists(...))` get rejected. In Spark, we'd be able to solve that problem by updating the check to run after optimization, but for now I've just done a minor rewrite to push down the not and simplify.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-753219119


   @rdblue Hmmn..in my understanding Analyzer mostly concerns itself with resolving the attributes and optimizer does the work of optimizing the logical plan which includes rewrites ? There were many rewrites that were happening in analyzer before like subquery rewrites that have been moved to optimizer because of this principle.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551059915



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {
+    val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation, newWriteInfo(table.schema))
+
+    override def name: String = table.name
+    override def schema: StructType = table.schema
+    override def partitioning: Array[Transform] = table.partitioning
+    override def properties: util.Map[String, String] = table.properties
+    override def capabilities: util.Set[TableCapability] = table.capabilities
+    override def toString: String = table.toString
+
+    // TODO: refactor merge builder to accept options and info after construction
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = mergeBuilder.asScanBuilder()
+    override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = mergeBuilder.asWriteBuilder()
+
+    private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+      val uuid = UUID.randomUUID()
+      LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+    }
+  }
 
-    // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
-      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
-      val writeInfo = newWriteInfo(r.schema)
-      val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
+  private class DeletableMergeTable(
+      table: Table with SupportsMerge with ExtendedSupportsDelete,
+      operation: String) extends MergeTable(table, operation) with ExtendedSupportsDelete {
+    override def canDeleteWhere(filters: Array[sources.Filter]): Boolean = table.canDeleteWhere(filters)
+    override def deleteWhere(filters: Array[sources.Filter]): Unit = table.deleteWhere(filters)
+  }
 
-      val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case DeleteFromTable(r, Some(cond)) =>
+      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
+      val relation = r.collectFirst {
+        case v2: DataSourceV2Relation =>
+          v2
+      }.get
+
+      val mergeTable = relation.table match {
+        case withDelete: Table with SupportsMerge with ExtendedSupportsDelete =>
+          new DeletableMergeTable(withDelete, "delete")
+        case _ =>
+          MergeTable(relation.table.asMergeable, "delete")
+      }
+      val scanPlan = buildScanPlan(mergeTable, relation, cond)
 
-      val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
+      val remainingRowFilter = not(cond)
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
 
-      val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-      val writePlan = buildWritePlan(remainingRowsPlan, r.output)
-      ReplaceData(r, mergeWrite, writePlan)
+      val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+      val writeRelation = relation.copy(table = mergeTable, output = addFileAndPos(relation.output))
+
+      if (SubqueryExpression.hasSubquery(cond)) {
+        DeleteFrom(writeRelation, None, writePlan, None)
+      } else {
+        DeleteFrom(writeRelation, Some(cond), writePlan, None)
+      }
   }
 
   private def buildScanPlan(
-      table: Table,
-      output: Seq[AttributeReference],
-      mergeBuilder: MergeBuilder,
+      mergeTable: MergeTable,
+      tableRelation: DataSourceV2Relation,
       cond: Expression): LogicalPlan = {
+    val mergeRelation = tableRelation.copy(table = mergeTable, output = addFileAndPos(tableRelation.output))
 
-    val scanBuilder = mergeBuilder.asScanBuilder
-
-    val predicates = splitConjunctivePredicates(cond)
-    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
-    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
-    val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
-
-    scan match {
+    mergeTable.mergeBuilder match {
       case filterable: SupportsFileFilter =>
-        val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
-        val dynamicFileFilter = DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
-        dynamicFileFilter
+        val filterRelation = tableRelation.copy(output = addFileAndPos(tableRelation.output))
+        val filteredFiles = new FilterFiles()
+        val matchingFilePlan = buildFileFilterPlan(cond, filterRelation)
+        filterable.filterFiles(filteredFiles)
+        DynamicFileFilter(mergeRelation, matchingFilePlan, filteredFiles)
+
       case _ =>
-        scanRelation
+        mergeRelation
     }
   }
 
   private def buildWritePlan(
       remainingRowsPlan: LogicalPlan,
       output: Seq[AttributeReference]): LogicalPlan = {
 
-    val fileNameCol = findOutputAttr(remainingRowsPlan, FILE_NAME_COL)
-    val rowPosCol = findOutputAttr(remainingRowsPlan, ROW_POS_COL)
+    val fileNameCol = UnresolvedAttribute(FILE_NAME_COL)
+    val rowPosCol = UnresolvedAttribute(ROW_POS_COL)
     val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, Ascending))
     val numShufflePartitions = SQLConf.get.numShufflePartitions
     val repartition = RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, numShufflePartitions)
     val sort = Sort(order, global = false, repartition)
     Project(output, sort)
   }
 
-  private def isMetadataDelete(relation: DataSourceV2Relation, cond: Expression): Boolean = {
-    relation.table match {
-      case t: ExtendedSupportsDelete if !SubqueryExpression.hasSubquery(cond) =>
-        val predicates = splitConjunctivePredicates(cond)
-        val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, relation.output)
-        val dataSourceFilters = toDataSourceFilters(normalizedPredicates)
-        val allPredicatesTranslated = normalizedPredicates.size == dataSourceFilters.length
-        allPredicatesTranslated && t.canDeleteWhere(dataSourceFilters)
-      case _ => false
-    }
-  }
-
-  private def toDataSourceFilters(predicates: Seq[Expression]): Array[sources.Filter] = {
-    predicates.flatMap { p =>
-      val translatedFilter = DataSourceStrategy.translateFilter(p, supportNestedPredicatePushdown = true)
-      if (translatedFilter.isEmpty) {
-        logWarning(s"Cannot translate expression to source filter: $p")
-      }
-      translatedFilter
-    }.toArray
-  }
-
-  private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
-    val uuid = UUID.randomUUID()
-    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
-  }
-
-  private def buildFileFilterPlan(cond: Expression, scanRelation: DataSourceV2ScanRelation): LogicalPlan = {
+  private def buildFileFilterPlan(cond: Expression, scanRelation: DataSourceV2Relation): LogicalPlan = {
     val matchingFilter = Filter(cond, scanRelation)
-    val fileAttr = findOutputAttr(matchingFilter, FILE_NAME_COL)
+    val fileAttr = UnresolvedAttribute(FILE_NAME_COL)
     val agg = Aggregate(Seq(fileAttr), Seq(fileAttr), matchingFilter)
-    Project(Seq(findOutputAttr(agg, FILE_NAME_COL)), agg)
+    Project(Seq(UnresolvedAttribute(FILE_NAME_COL)), agg)
   }
 
-  private def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
-    val resolver = SQLConf.get.resolver
-    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
-      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+  private def not(expr: Expression): Expression = {

Review comment:
       I should also note that this is why tests are different. That analysis check rule catches either the original condition in the `DeleteFrom` node (which is there to convert back to a metadata rewrite in the optimizer rule) or catches the negated condition in the rewrite plan.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551059941



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(

Review comment:
       Yeah, this is hacky. If we decided to do this then we would want to adjust some of the implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-754533819


   > I don't agree that those optimizations are prohibited. I think they are still possible, it would just make the implementation different.
   
   By prohibited I mean that we won't have separate delete/update/merge nodes after the rewrite (as far as I understand). Therefore, equivalent optimizations will be harder. For example, a rule to drop a branch on `MergeIntoTable` would be trivial if we run it before the rewrite. Doing the same optimization after the rewrite is possible but would be harder.
   
   > The problem is that the analyzer rule to add metadata columns to the logical plan runs in the analyzer
   
   This adds another perspective. We could make it work with row-level nodes too if we had a concept of `_row_id` that a data source would return.
   
   > And, we lose more depending on what batch we use in the optimizer.
   
   I think the initial plan was to do this after operator optimization to make sure conditions are optimal. However, I agree that we won't run operator optimization batch (and potentially some other optimizer rules) after the rewrite, which is probably the biggest concern to me in rewriting in the optimizer.
   
   Overall, I believe we should think through how the rewrite in the analyzer will work, how to avoid job planning twice, do extra optimization later, etc. If we can come up with solutions to those problems, I think it will be safer. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r552116721



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ConvertMetadataDelete.scala
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFrom
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.iceberg.catalog.ExtendedSupportsDelete
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.BooleanType
+
+object ConvertMetadataDelete extends Rule[LogicalPlan] with PredicateHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case DeleteFrom(relation: DataSourceV2Relation, Some(Literal(const, BooleanType)), _, _) =>

Review comment:
       `None` in this case signals that this rule should not be applied. The problem I was hitting was that some queries with subquery conditions could be rewritten, but would be rejected by the analysis checks if the condition was still present in the plan. So I remove subqueries that can't be done in metadata.
   
   I think the real problem is that Spark should reject those later, rather than only allowing some nodes to contain subqueries.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551262005



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {
+    val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation, newWriteInfo(table.schema))
+
+    override def name: String = table.name
+    override def schema: StructType = table.schema
+    override def partitioning: Array[Transform] = table.partitioning
+    override def properties: util.Map[String, String] = table.properties
+    override def capabilities: util.Set[TableCapability] = table.capabilities
+    override def toString: String = table.toString
+
+    // TODO: refactor merge builder to accept options and info after construction
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = mergeBuilder.asScanBuilder()
+    override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = mergeBuilder.asWriteBuilder()
+
+    private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+      val uuid = UUID.randomUUID()
+      LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+    }
+  }
 
-    // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
-      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
-      val writeInfo = newWriteInfo(r.schema)
-      val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
+  private class DeletableMergeTable(
+      table: Table with SupportsMerge with ExtendedSupportsDelete,
+      operation: String) extends MergeTable(table, operation) with ExtendedSupportsDelete {
+    override def canDeleteWhere(filters: Array[sources.Filter]): Boolean = table.canDeleteWhere(filters)
+    override def deleteWhere(filters: Array[sources.Filter]): Unit = table.deleteWhere(filters)
+  }
 
-      val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case DeleteFromTable(r, Some(cond)) =>
+      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
+      val relation = r.collectFirst {
+        case v2: DataSourceV2Relation =>
+          v2
+      }.get
+
+      val mergeTable = relation.table match {
+        case withDelete: Table with SupportsMerge with ExtendedSupportsDelete =>
+          new DeletableMergeTable(withDelete, "delete")
+        case _ =>
+          MergeTable(relation.table.asMergeable, "delete")
+      }
+      val scanPlan = buildScanPlan(mergeTable, relation, cond)
 
-      val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
+      val remainingRowFilter = not(cond)
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
 
-      val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-      val writePlan = buildWritePlan(remainingRowsPlan, r.output)
-      ReplaceData(r, mergeWrite, writePlan)
+      val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+      val writeRelation = relation.copy(table = mergeTable, output = addFileAndPos(relation.output))
+
+      if (SubqueryExpression.hasSubquery(cond)) {
+        DeleteFrom(writeRelation, None, writePlan, None)
+      } else {
+        DeleteFrom(writeRelation, Some(cond), writePlan, None)
+      }
   }
 
   private def buildScanPlan(
-      table: Table,
-      output: Seq[AttributeReference],
-      mergeBuilder: MergeBuilder,
+      mergeTable: MergeTable,
+      tableRelation: DataSourceV2Relation,
       cond: Expression): LogicalPlan = {
+    val mergeRelation = tableRelation.copy(table = mergeTable, output = addFileAndPos(tableRelation.output))
 
-    val scanBuilder = mergeBuilder.asScanBuilder
-
-    val predicates = splitConjunctivePredicates(cond)
-    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
-    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
-    val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
-
-    scan match {
+    mergeTable.mergeBuilder match {

Review comment:
       @rdblue, does this mean we will do the job planning twice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r550994640



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ConvertMetadataDelete.scala
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFrom
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.iceberg.catalog.ExtendedSupportsDelete
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.BooleanType
+
+object ConvertMetadataDelete extends Rule[LogicalPlan] with PredicateHelper {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case DeleteFrom(relation: DataSourceV2Relation, Some(Literal(const, BooleanType)), _, _) =>

Review comment:
       Deletes without conditions can be answered using metadata only too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r550998490



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {
+    val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation, newWriteInfo(table.schema))
+
+    override def name: String = table.name
+    override def schema: StructType = table.schema
+    override def partitioning: Array[Transform] = table.partitioning
+    override def properties: util.Map[String, String] = table.properties
+    override def capabilities: util.Set[TableCapability] = table.capabilities
+    override def toString: String = table.toString
+
+    // TODO: refactor merge builder to accept options and info after construction
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = mergeBuilder.asScanBuilder()
+    override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = mergeBuilder.asWriteBuilder()
+
+    private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+      val uuid = UUID.randomUUID()
+      LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+    }
+  }
 
-    // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
-      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
-      val writeInfo = newWriteInfo(r.schema)
-      val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
+  private class DeletableMergeTable(
+      table: Table with SupportsMerge with ExtendedSupportsDelete,
+      operation: String) extends MergeTable(table, operation) with ExtendedSupportsDelete {
+    override def canDeleteWhere(filters: Array[sources.Filter]): Boolean = table.canDeleteWhere(filters)
+    override def deleteWhere(filters: Array[sources.Filter]): Unit = table.deleteWhere(filters)
+  }
 
-      val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case DeleteFromTable(r, Some(cond)) =>
+      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
+      val relation = r.collectFirst {
+        case v2: DataSourceV2Relation =>
+          v2
+      }.get
+
+      val mergeTable = relation.table match {
+        case withDelete: Table with SupportsMerge with ExtendedSupportsDelete =>
+          new DeletableMergeTable(withDelete, "delete")
+        case _ =>
+          MergeTable(relation.table.asMergeable, "delete")
+      }
+      val scanPlan = buildScanPlan(mergeTable, relation, cond)
 
-      val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
+      val remainingRowFilter = not(cond)
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
 
-      val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-      val writePlan = buildWritePlan(remainingRowsPlan, r.output)
-      ReplaceData(r, mergeWrite, writePlan)
+      val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+      val writeRelation = relation.copy(table = mergeTable, output = addFileAndPos(relation.output))
+
+      if (SubqueryExpression.hasSubquery(cond)) {
+        DeleteFrom(writeRelation, None, writePlan, None)
+      } else {
+        DeleteFrom(writeRelation, Some(cond), writePlan, None)
+      }
   }
 
   private def buildScanPlan(
-      table: Table,
-      output: Seq[AttributeReference],
-      mergeBuilder: MergeBuilder,
+      mergeTable: MergeTable,
+      tableRelation: DataSourceV2Relation,
       cond: Expression): LogicalPlan = {
+    val mergeRelation = tableRelation.copy(table = mergeTable, output = addFileAndPos(tableRelation.output))
 
-    val scanBuilder = mergeBuilder.asScanBuilder
-
-    val predicates = splitConjunctivePredicates(cond)
-    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
-    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
-    val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
-
-    scan match {
+    mergeTable.mergeBuilder match {
       case filterable: SupportsFileFilter =>
-        val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
-        val dynamicFileFilter = DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
-        dynamicFileFilter
+        val filterRelation = tableRelation.copy(output = addFileAndPos(tableRelation.output))
+        val filteredFiles = new FilterFiles()
+        val matchingFilePlan = buildFileFilterPlan(cond, filterRelation)
+        filterable.filterFiles(filteredFiles)
+        DynamicFileFilter(mergeRelation, matchingFilePlan, filteredFiles)
+
       case _ =>
-        scanRelation
+        mergeRelation
     }
   }
 
   private def buildWritePlan(
       remainingRowsPlan: LogicalPlan,
       output: Seq[AttributeReference]): LogicalPlan = {
 
-    val fileNameCol = findOutputAttr(remainingRowsPlan, FILE_NAME_COL)
-    val rowPosCol = findOutputAttr(remainingRowsPlan, ROW_POS_COL)
+    val fileNameCol = UnresolvedAttribute(FILE_NAME_COL)
+    val rowPosCol = UnresolvedAttribute(ROW_POS_COL)
     val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, Ascending))
     val numShufflePartitions = SQLConf.get.numShufflePartitions
     val repartition = RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, numShufflePartitions)
     val sort = Sort(order, global = false, repartition)
     Project(output, sort)
   }
 
-  private def isMetadataDelete(relation: DataSourceV2Relation, cond: Expression): Boolean = {
-    relation.table match {
-      case t: ExtendedSupportsDelete if !SubqueryExpression.hasSubquery(cond) =>
-        val predicates = splitConjunctivePredicates(cond)
-        val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, relation.output)
-        val dataSourceFilters = toDataSourceFilters(normalizedPredicates)
-        val allPredicatesTranslated = normalizedPredicates.size == dataSourceFilters.length
-        allPredicatesTranslated && t.canDeleteWhere(dataSourceFilters)
-      case _ => false
-    }
-  }
-
-  private def toDataSourceFilters(predicates: Seq[Expression]): Array[sources.Filter] = {
-    predicates.flatMap { p =>
-      val translatedFilter = DataSourceStrategy.translateFilter(p, supportNestedPredicatePushdown = true)
-      if (translatedFilter.isEmpty) {
-        logWarning(s"Cannot translate expression to source filter: $p")
-      }
-      translatedFilter
-    }.toArray
-  }
-
-  private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
-    val uuid = UUID.randomUUID()
-    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
-  }
-
-  private def buildFileFilterPlan(cond: Expression, scanRelation: DataSourceV2ScanRelation): LogicalPlan = {
+  private def buildFileFilterPlan(cond: Expression, scanRelation: DataSourceV2Relation): LogicalPlan = {
     val matchingFilter = Filter(cond, scanRelation)
-    val fileAttr = findOutputAttr(matchingFilter, FILE_NAME_COL)
+    val fileAttr = UnresolvedAttribute(FILE_NAME_COL)
     val agg = Aggregate(Seq(fileAttr), Seq(fileAttr), matchingFilter)
-    Project(Seq(findOutputAttr(agg, FILE_NAME_COL)), agg)
+    Project(Seq(UnresolvedAttribute(FILE_NAME_COL)), agg)
   }
 
-  private def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
-    val resolver = SQLConf.get.resolver
-    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
-      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+  private def not(expr: Expression): Expression = {

Review comment:
       Why do we have to change this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551059460



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {
+    val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation, newWriteInfo(table.schema))
+
+    override def name: String = table.name
+    override def schema: StructType = table.schema
+    override def partitioning: Array[Transform] = table.partitioning
+    override def properties: util.Map[String, String] = table.properties
+    override def capabilities: util.Set[TableCapability] = table.capabilities
+    override def toString: String = table.toString
+
+    // TODO: refactor merge builder to accept options and info after construction
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = mergeBuilder.asScanBuilder()
+    override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = mergeBuilder.asWriteBuilder()
+
+    private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+      val uuid = UUID.randomUUID()
+      LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+    }
+  }
 
-    // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
-      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
-      val writeInfo = newWriteInfo(r.schema)
-      val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
+  private class DeletableMergeTable(
+      table: Table with SupportsMerge with ExtendedSupportsDelete,
+      operation: String) extends MergeTable(table, operation) with ExtendedSupportsDelete {
+    override def canDeleteWhere(filters: Array[sources.Filter]): Boolean = table.canDeleteWhere(filters)
+    override def deleteWhere(filters: Array[sources.Filter]): Unit = table.deleteWhere(filters)
+  }
 
-      val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case DeleteFromTable(r, Some(cond)) =>
+      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
+      val relation = r.collectFirst {
+        case v2: DataSourceV2Relation =>
+          v2
+      }.get
+
+      val mergeTable = relation.table match {
+        case withDelete: Table with SupportsMerge with ExtendedSupportsDelete =>
+          new DeletableMergeTable(withDelete, "delete")
+        case _ =>
+          MergeTable(relation.table.asMergeable, "delete")
+      }
+      val scanPlan = buildScanPlan(mergeTable, relation, cond)
 
-      val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
+      val remainingRowFilter = not(cond)
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
 
-      val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-      val writePlan = buildWritePlan(remainingRowsPlan, r.output)
-      ReplaceData(r, mergeWrite, writePlan)
+      val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+      val writeRelation = relation.copy(table = mergeTable, output = addFileAndPos(relation.output))
+
+      if (SubqueryExpression.hasSubquery(cond)) {
+        DeleteFrom(writeRelation, None, writePlan, None)
+      } else {
+        DeleteFrom(writeRelation, Some(cond), writePlan, None)
+      }
   }
 
   private def buildScanPlan(
-      table: Table,
-      output: Seq[AttributeReference],
-      mergeBuilder: MergeBuilder,
+      mergeTable: MergeTable,
+      tableRelation: DataSourceV2Relation,
       cond: Expression): LogicalPlan = {
+    val mergeRelation = tableRelation.copy(table = mergeTable, output = addFileAndPos(tableRelation.output))
 
-    val scanBuilder = mergeBuilder.asScanBuilder
-
-    val predicates = splitConjunctivePredicates(cond)
-    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
-    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
-    val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
-
-    scan match {
+    mergeTable.mergeBuilder match {
       case filterable: SupportsFileFilter =>
-        val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
-        val dynamicFileFilter = DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
-        dynamicFileFilter
+        val filterRelation = tableRelation.copy(output = addFileAndPos(tableRelation.output))
+        val filteredFiles = new FilterFiles()
+        val matchingFilePlan = buildFileFilterPlan(cond, filterRelation)
+        filterable.filterFiles(filteredFiles)
+        DynamicFileFilter(mergeRelation, matchingFilePlan, filteredFiles)
+
       case _ =>
-        scanRelation
+        mergeRelation
     }
   }
 
   private def buildWritePlan(
       remainingRowsPlan: LogicalPlan,
       output: Seq[AttributeReference]): LogicalPlan = {
 
-    val fileNameCol = findOutputAttr(remainingRowsPlan, FILE_NAME_COL)
-    val rowPosCol = findOutputAttr(remainingRowsPlan, ROW_POS_COL)
+    val fileNameCol = UnresolvedAttribute(FILE_NAME_COL)

Review comment:
       Yes, we should only trigger when the delete node is resolved. That should also take care of the case where I had to remove a subquery alias above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal edited a comment on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
dilipbiswal edited a comment on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-753219119


   @rdblue Hmmn..in my understanding Analyzer mostly concerns itself with resolving the attributes and optimizer does the work of optimizing the logical plan which includes rewrites ? There were many rewrites that were happening in analyzer before like subquery rewrites that have been moved to optimizer because of this principle. There are other examples like rewrites that happen today for set operations like `exept/intersect`, `except all/intersect all` that happen during optimization phase.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-753391385


   @dilipbiswal, I agree with you and that's why I think that it makes the most sense for this to be in the analyzer. We want all of those rewrite rules to be applied by the optimizer, rather than trying to do everything in this rule.
   
   The rewrite needs to run before subquery rewrites and optimizations are applied, but it currently runs in the operator optimization batch that is after OptimizeSubqueries, PullupCorrelatedPredicates, and others. By doing the initial rewrite in the analyzer, I think we can delegate more work to the optimizer so we can make this rewrite simple and have it optimized later.
   
   I think that the analyzer is a good place to run rewrites that produce a concrete logical plan for some action, like delete from. Another example is materialized view refresh. Ideally, we would produce a logical plan that then gets analyzed and optimized like a query that was written by hand.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on a change in pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on a change in pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#discussion_r551040546



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -66,117 +75,120 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
   private val FILE_NAME_COL = "_file"
   private val ROW_POS_COL = "_pos"
 
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
-    case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isMetadataDelete(r, cond) =>
-      d
+  private case class MergeTable(
+      table: Table with SupportsMerge,
+      operation: String) extends Table with SupportsRead with SupportsWrite {
+    val mergeBuilder: MergeBuilder = table.newMergeBuilder(operation, newWriteInfo(table.schema))
+
+    override def name: String = table.name
+    override def schema: StructType = table.schema
+    override def partitioning: Array[Transform] = table.partitioning
+    override def properties: util.Map[String, String] = table.properties
+    override def capabilities: util.Set[TableCapability] = table.capabilities
+    override def toString: String = table.toString
+
+    // TODO: refactor merge builder to accept options and info after construction
+    override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = mergeBuilder.asScanBuilder()
+    override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = mergeBuilder.asWriteBuilder()
+
+    private def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+      val uuid = UUID.randomUUID()
+      LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+    }
+  }
 
-    // rewrite all operations that require reading the table to delete records
-    case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) =>
-      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
-      val writeInfo = newWriteInfo(r.schema)
-      val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)
+  private class DeletableMergeTable(
+      table: Table with SupportsMerge with ExtendedSupportsDelete,
+      operation: String) extends MergeTable(table, operation) with ExtendedSupportsDelete {
+    override def canDeleteWhere(filters: Array[sources.Filter]): Boolean = table.canDeleteWhere(filters)
+    override def deleteWhere(filters: Array[sources.Filter]): Unit = table.deleteWhere(filters)
+  }
 
-      val scanPlan = buildScanPlan(r.table, r.output, mergeBuilder, cond)
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case DeleteFromTable(r, Some(cond)) =>
+      // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite
+      val relation = r.collectFirst {
+        case v2: DataSourceV2Relation =>
+          v2
+      }.get
+
+      val mergeTable = relation.table match {
+        case withDelete: Table with SupportsMerge with ExtendedSupportsDelete =>
+          new DeletableMergeTable(withDelete, "delete")
+        case _ =>
+          MergeTable(relation.table.asMergeable, "delete")
+      }
+      val scanPlan = buildScanPlan(mergeTable, relation, cond)
 
-      val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
+      val remainingRowFilter = not(cond)
       val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
 
-      val mergeWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-      val writePlan = buildWritePlan(remainingRowsPlan, r.output)
-      ReplaceData(r, mergeWrite, writePlan)
+      val writePlan = buildWritePlan(remainingRowsPlan, relation.output)
+      val writeRelation = relation.copy(table = mergeTable, output = addFileAndPos(relation.output))
+
+      if (SubqueryExpression.hasSubquery(cond)) {
+        DeleteFrom(writeRelation, None, writePlan, None)
+      } else {
+        DeleteFrom(writeRelation, Some(cond), writePlan, None)
+      }
   }
 
   private def buildScanPlan(
-      table: Table,
-      output: Seq[AttributeReference],
-      mergeBuilder: MergeBuilder,
+      mergeTable: MergeTable,
+      tableRelation: DataSourceV2Relation,
       cond: Expression): LogicalPlan = {
+    val mergeRelation = tableRelation.copy(table = mergeTable, output = addFileAndPos(tableRelation.output))
 
-    val scanBuilder = mergeBuilder.asScanBuilder
-
-    val predicates = splitConjunctivePredicates(cond)
-    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
-    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
-
-    val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
-
-    scan match {
+    mergeTable.mergeBuilder match {
       case filterable: SupportsFileFilter =>
-        val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
-        val dynamicFileFilter = DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
-        dynamicFileFilter
+        val filterRelation = tableRelation.copy(output = addFileAndPos(tableRelation.output))
+        val filteredFiles = new FilterFiles()
+        val matchingFilePlan = buildFileFilterPlan(cond, filterRelation)
+        filterable.filterFiles(filteredFiles)
+        DynamicFileFilter(mergeRelation, matchingFilePlan, filteredFiles)
+
       case _ =>
-        scanRelation
+        mergeRelation
     }
   }
 
   private def buildWritePlan(
       remainingRowsPlan: LogicalPlan,
       output: Seq[AttributeReference]): LogicalPlan = {
 
-    val fileNameCol = findOutputAttr(remainingRowsPlan, FILE_NAME_COL)
-    val rowPosCol = findOutputAttr(remainingRowsPlan, ROW_POS_COL)
+    val fileNameCol = UnresolvedAttribute(FILE_NAME_COL)

Review comment:
       @rdblue If we keep this as an analysis rule, should we do  trigger this only when the `delete` node is resolved ? The reason i am thinking is, if we report an analysis failure on the rewritten plan, it may be confusing to the user as it may look very different than his original query ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on pull request #2017: Move RewriteDelete to the analyzer

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on pull request #2017:
URL: https://github.com/apache/iceberg/pull/2017#issuecomment-754112570


   @aokolnychyi 
   Nice explanation about `early rewrite` in Analyzer. One other thing i worry about is, sometimes introduction of newer plan nodes may come in the way of existing optimization rules like pushing down filters or collapsing projects. Having said this, we may be okay in this case of `delete` in the present form. But we may have to worry about the future as the rule evolves. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org