You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/03 20:50:07 UTC

[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2206: Spark: Support UPDATE statements with subqueries

RussellSpitzer commented on a change in pull request #2206:
URL: https://github.com/apache/iceberg/pull/2206#discussion_r569738272



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
##########
@@ -45,7 +50,37 @@ case class RewriteUpdate(spark: SparkSession) extends Rule[LogicalPlan] with Rew
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case UpdateTable(r: DataSourceV2Relation, assignments, Some(cond))
         if isIcebergRelation(r) && SubqueryExpression.hasSubquery(cond) =>
-      throw new AnalysisException("UPDATE statements with subqueries are not currently supported")
+
+      val writeInfo = newWriteInfo(r.schema)
+      val mergeBuilder = r.table.asMergeable.newMergeBuilder("update", writeInfo)
+
+      // since we are processing matched and not matched rows using separate jobs
+      // there will be two scans but we want to execute the dynamic file filter only once
+      // so the first job uses DynamicFileFilter and the second one uses the underlying scan plan
+      // both jobs share the same SparkMergeScan instance to ensure they operate on same files
+      val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
+      val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, mergeBuilder, cond, matchingRowsPlanBuilder)
+      val underlyingScanPlan = scanPlan match {
+        case DynamicFileFilter(plan, _, _) => plan
+        case _ => scanPlan
+      }
+
+      // build a plan for records that match the cond and should be updated
+      val matchedRowsPlan = Filter(cond, scanPlan)
+      val updatedRowsPlan = buildUpdateProjection(r, matchedRowsPlan, assignments)
+
+      // TODO: Is it ok to use the same scan relation? Shall we clone it and reuse only Scan?

Review comment:
       The ScanRelation should be immutable, so I don't think it's an issue to pass it through to multiple things




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org