You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/09/28 14:35:09 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #5880: Spark 3.2, 3.3: Fix nullability in merge-on-read projections

aokolnychyi opened a new pull request, #5880:
URL: https://github.com/apache/iceberg/pull/5880

   This PR fixes the second part of #5739 and is an alternative to #5789. Instead of reporting a wrong output nullability for merged output attributes, this change fixes the schema used in lazy projections for row, row ID and metadata attributes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi closed pull request #5880: Spark 3.2, 3.3: Fix nullability in merge-on-read projections

Posted by GitBox <gi...@apache.org>.
aokolnychyi closed pull request #5880: Spark 3.2, 3.3: Fix nullability in merge-on-read projections
URL: https://github.com/apache/iceberg/pull/5880


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5880: Spark 3.2, 3.3: Fix nullability in merge-on-read projections

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5880:
URL: https://github.com/apache/iceberg/pull/5880#issuecomment-1261015298

   @mohitgargk @Fokko @singhpk234 @ChaladiMohanVamsi, could you take a look at this way of fixing the problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5880: Spark 3.2, 3.3: Fix nullability in merge-on-read projections

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5880:
URL: https://github.com/apache/iceberg/pull/5880#discussion_r983579590


##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -384,4 +388,55 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand {
   private def resolveAttrRef(ref: NamedReference, plan: LogicalPlan): AttributeReference = {
     ExtendedV2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
   }
+
+  private def buildWriteDeltaProjections(

Review Comment:
   Actually, Scala does not treat it as an override. I can rename to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5880: Spark 3.2, 3.3: Fix nullability in merge-on-read projections

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5880:
URL: https://github.com/apache/iceberg/pull/5880#discussion_r983494220


##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -384,4 +388,55 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand {
   private def resolveAttrRef(ref: NamedReference, plan: LogicalPlan): AttributeReference = {
     ExtendedV2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
   }
+
+  private def buildWriteDeltaProjections(

Review Comment:
   [minor] should we override this ? 



##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -384,4 +388,55 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand {
   private def resolveAttrRef(ref: NamedReference, plan: LogicalPlan): AttributeReference = {
     ExtendedV2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
   }
+
+  private def buildWriteDeltaProjections(
+      mergeRows: MergeRows,
+      rowAttrs: Seq[Attribute],
+      rowIdAttrs: Seq[Attribute],
+      metadataAttrs: Seq[Attribute]): WriteDeltaProjections = {
+
+    val outputAttrs = mergeRows.output
+
+    val outputs = mergeRows.matchedOutputs ++ mergeRows.notMatchedOutputs
+    val insertAndUpdateOutputs = outputs.filterNot(_.head == Literal(DELETE_OPERATION))
+    val updateAndDeleteOutputs = outputs.filterNot(_.head == Literal(INSERT_OPERATION))
+
+    val rowProjection = if (rowAttrs.nonEmpty) {
+      Some(newLazyProjection(insertAndUpdateOutputs, outputAttrs, rowAttrs))
+    } else {
+      None
+    }
+
+    val rowIdProjection = newLazyProjection(updateAndDeleteOutputs, outputAttrs, rowIdAttrs)
+
+    val metadataProjection = if (metadataAttrs.nonEmpty) {
+      Some(newLazyProjection(updateAndDeleteOutputs, outputAttrs, metadataAttrs))
+    } else {
+      None
+    }
+
+    WriteDeltaProjections(rowProjection, rowIdProjection, metadataProjection)
+  }
+
+  // the projection is done by name, ignoring expr IDs
+  private def newLazyProjection(
+      outputs: Seq[Seq[Expression]],
+      outputAttrs: Seq[Attribute],
+      projectedAttrs: Seq[Attribute]): ProjectingInternalRow = {
+
+    val projectedOrdinals = projectedAttrs.map(attr => outputAttrs.indexWhere(_.name == attr.name))
+
+    val structFields = projectedAttrs.zip(projectedOrdinals).map { case (attr, ordinal) =>
+      // output attr is nullable if at least one action may produce null for that attr
+      // but row ID and metadata attrs are projected only in update/delete actions and
+      // row attrs are projected only in insert/update actions
+      // that's why the projection schema must rely only on relevant action outputs
+      // instead of blindly inheriting the output attr nullability

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org