You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/03 20:54:08 UTC

[GitHub] [iceberg] rdblue commented on a change in pull request #2206: Spark: Support UPDATE statements with subqueries

rdblue commented on a change in pull request #2206:
URL: https://github.com/apache/iceberg/pull/2206#discussion_r569740629



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteUpdate.scala
##########
@@ -45,7 +50,37 @@ case class RewriteUpdate(spark: SparkSession) extends Rule[LogicalPlan] with Rew
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case UpdateTable(r: DataSourceV2Relation, assignments, Some(cond))
         if isIcebergRelation(r) && SubqueryExpression.hasSubquery(cond) =>
-      throw new AnalysisException("UPDATE statements with subqueries are not currently supported")
+
+      val writeInfo = newWriteInfo(r.schema)
+      val mergeBuilder = r.table.asMergeable.newMergeBuilder("update", writeInfo)
+
+      // since we are processing matched and not matched rows using separate jobs
+      // there will be two scans but we want to execute the dynamic file filter only once
+      // so the first job uses DynamicFileFilter and the second one uses the underlying scan plan
+      // both jobs share the same SparkMergeScan instance to ensure they operate on same files
+      val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
+      val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, mergeBuilder, cond, matchingRowsPlanBuilder)
+      val underlyingScanPlan = scanPlan match {
+        case DynamicFileFilter(plan, _, _) => plan
+        case _ => scanPlan
+      }
+
+      // build a plan for records that match the cond and should be updated
+      val matchedRowsPlan = Filter(cond, scanPlan)
+      val updatedRowsPlan = buildUpdateProjection(r, matchedRowsPlan, assignments)
+
+      // TODO: Is it ok to use the same scan relation? Shall we clone it and reuse only Scan?

Review comment:
       Technically, we should clone and reuse only scan so that the IDs used by different relations in the plan don't overlap. But, since this is already past the analyzer and we know we won't introduce any cases where the relations conflict (like joining them) it should be okay. This is just not something we could do if we moved this upstream.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org