You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/29 13:15:27 UTC

[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5880: Spark 3.2, 3.3: Fix nullability in merge-on-read projections

singhpk234 commented on code in PR #5880:
URL: https://github.com/apache/iceberg/pull/5880#discussion_r983494220


##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -384,4 +388,55 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand {
   private def resolveAttrRef(ref: NamedReference, plan: LogicalPlan): AttributeReference = {
     ExtendedV2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
   }
+
+  private def buildWriteDeltaProjections(

Review Comment:
   [minor] should we override this ? 



##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -384,4 +388,55 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand {
   private def resolveAttrRef(ref: NamedReference, plan: LogicalPlan): AttributeReference = {
     ExtendedV2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
   }
+
+  private def buildWriteDeltaProjections(
+      mergeRows: MergeRows,
+      rowAttrs: Seq[Attribute],
+      rowIdAttrs: Seq[Attribute],
+      metadataAttrs: Seq[Attribute]): WriteDeltaProjections = {
+
+    val outputAttrs = mergeRows.output
+
+    val outputs = mergeRows.matchedOutputs ++ mergeRows.notMatchedOutputs
+    val insertAndUpdateOutputs = outputs.filterNot(_.head == Literal(DELETE_OPERATION))
+    val updateAndDeleteOutputs = outputs.filterNot(_.head == Literal(INSERT_OPERATION))
+
+    val rowProjection = if (rowAttrs.nonEmpty) {
+      Some(newLazyProjection(insertAndUpdateOutputs, outputAttrs, rowAttrs))
+    } else {
+      None
+    }
+
+    val rowIdProjection = newLazyProjection(updateAndDeleteOutputs, outputAttrs, rowIdAttrs)
+
+    val metadataProjection = if (metadataAttrs.nonEmpty) {
+      Some(newLazyProjection(updateAndDeleteOutputs, outputAttrs, metadataAttrs))
+    } else {
+      None
+    }
+
+    WriteDeltaProjections(rowProjection, rowIdProjection, metadataProjection)
+  }
+
+  // the projection is done by name, ignoring expr IDs
+  private def newLazyProjection(
+      outputs: Seq[Seq[Expression]],
+      outputAttrs: Seq[Attribute],
+      projectedAttrs: Seq[Attribute]): ProjectingInternalRow = {
+
+    val projectedOrdinals = projectedAttrs.map(attr => outputAttrs.indexWhere(_.name == attr.name))
+
+    val structFields = projectedAttrs.zip(projectedOrdinals).map { case (attr, ordinal) =>
+      // output attr is nullable if at least one action may produce null for that attr
+      // but row ID and metadata attrs are projected only in update/delete actions and
+      // row attrs are projected only in insert/update actions
+      // that's why the projection schema must rely only on relevant action outputs
+      // instead of blindly inheriting the output attr nullability

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org