You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/13 22:44:55 UTC

[GitHub] [iceberg] rdblue commented on a change in pull request #4047: Spark 3.2: Implement merge-on-read MERGE

rdblue commented on a change in pull request #4047:
URL: https://github.com/apache/iceberg/pull/4047#discussion_r805426755



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, mergeRows, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      source: LogicalPlan,
+      cond: Expression,
+      matchedActions: Seq[MergeAction],
+      notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, operationTable.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, operationTable, metadataAttrs, rowIdAttrs)
+    val readAttrs = readRelation.output
+
+    // project an extra column to check if a target row exists after the join
+    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, ROW_FROM_TARGET)()
+    val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+    // project an extra column to check if a source row exists after the join
+    val sourceTableProjExprs = source.output :+ Alias(TrueLiteral, ROW_FROM_SOURCE)()
+    val sourceTableProj = Project(sourceTableProjExprs, source)
+
+    // use inner join if there is no NOT MATCHED action, unmatched source rows can be discarded
+    // use right outer join in all other cases, unmatched source rows may be needed
+    // also disable broadcasts for the target table to perform the cardinality check later
+    val joinType = if (notMatchedActions.isEmpty) Inner else RightOuter
+    val joinHint = JoinHint(leftHint = Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, joinType, Some(cond), joinHint)
+
+    val deleteRowValues = buildDeltaDeleteRowValues(rowAttrs, rowIdAttrs)
+    val metadataReadAttrs = readAttrs.filterNot(relation.outputSet.contains)
+
+    val matchedConditions = matchedActions.map(actionCondition)
+    val matchedOutputs = matchedActions.map { action =>

Review comment:
       Nit: in the replace code, this is a one-liner. You could do the same thing here:
   
   ```scala
       val matchedOutputs = matchedActions.map(deltaActionOutput(_, deleteRowValues, metadataReadAttrs))
   ```
   
   Not a big deal, I'm just going through both and looking for changes so this looked a little odd. Same thing with `notMatchedOutputs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org