You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/05/23 17:23:39 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #7691: Spark 3.4: Codegen support for UpdateRowsExec

aokolnychyi opened a new pull request, #7691:
URL: https://github.com/apache/iceberg/pull/7691

   This PR adds codegen support for the newly added `UpdateRowsExec` that splits updates into deletes and inserts. This is a follow-up PR to #7646 that added this node. It is needed to be on par with the initial implementation that used a projection.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202755142


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =

Review Comment:
   This block ensures the same sub expressions are evaluated once.
   For instance, I have `c1 = id - 10, c2 = id - 10`. This ensures `id - 10` is not computed for each output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204742103


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   That's a good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#issuecomment-1654636801

   I will change this approach in a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202752035


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {

Review Comment:
   This is a similar optimization as in `ProjectExec`. It ensures if the same input attribute is used in multiple output expressions, we only fetch it once.
   
   Suppose I have `c1 = id + 10, c2 = id - 10`. Instead of fetching `id` for each output expression, we can do that once and reuse the same variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202753561


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {

Review Comment:
   This is inspired by `ProjectExec` and `GenerateExec` in Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204743595


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() {
                         branch)));
   }
 
+  @Test
+  public void subExpressionEliminationInCodegen() {
+    createAndInitTable("id INT, c1 INT, c2 INT");
+
+    sql("INSERT INTO TABLE %s VALUES (1, 11, 111), (2, 22, 222)", tableName);
+    createBranchIfNeeded();
+
+    // disable AQE to see the final plan with codegen in EXPLAIN

Review Comment:
   Let me play around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204407229


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() {
                         branch)));
   }
 
+  @Test
+  public void subExpressionEliminationInCodegen() {

Review Comment:
   should we now run TestUpdate both with / without Codegen ? as if at the moment there is a fallback is there is an issue in codegen that it silently fallbacks to interpreted mode which we don't intend, thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1203217632


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)

Review Comment:
   Unfortunately, Spark only allows non-deterministic expressions in a few nodes and all custom nodes are prohibited. We will need to add this to Spark to allow non-deterministic expressions in assignments. It is a similar problem in MERGE operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204744878


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() {
                         branch)));
   }
 
+  @Test
+  public void subExpressionEliminationInCodegen() {

Review Comment:
   I thought about disabling the fallback to cut the time. Maybe, we can further parameterize this suite. Let me see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1203155869


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)

Review Comment:
   Trying to follow this, is there any test that activates this part?  
   
   Walking through the new tests but didnt see this path activated, but maybe its covered by existing ones.
   
   And there's no concept of this in delete records, is it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202770697


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/UpdateProjectionBenchmark.java:
##########
@@ -146,6 +146,7 @@ private void setupSpark() {
             .config(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false")
             .config(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false")
             .config(SQLConf.SHUFFLE_PARTITIONS().key(), "2")
+            .config(SQLConf.CODEGEN_FACTORY_MODE().key(), "CODEGEN_ONLY")

Review Comment:
   I did not see a big performance improvement in the existing benchmark as read and write dominate, except the case with lots of updates. However, I've seen reduced memory pressure. Remember that the new approach without codegen was just a bit slower than the original projection. That said, codegen provides other benefits like sub expression elimination. It is important to be on par with the projection in terms of features.
   
   ```
         Benchmark                                                                              Mode  Cnt            Score             Error   Units
   [OLD] UpdateProjectionBenchmark.mergeOnRead10Percent                                           ss    5            4.915 ±           0.058    s/op
   [OLD] UpdateProjectionBenchmark.mergeOnRead10Percent:·gc.count                                 ss    5           12.000                    counts
   [NEW] UpdateProjectionBenchmark.mergeOnRead10Percent                                           ss    5            4.920 ±           0.080    s/op
   [NEW] UpdateProjectionBenchmark.mergeOnRead10Percent:·gc.count                                 ss    5           11.000                    counts
   
   [OLD] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent                                     ss    5           10.146 ±           0.347    s/op
   [OLD] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent:·gc.count                           ss    5           25.000                    counts
   [NEW] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent                                     ss    5           10.104 ±           0.122    s/op
   [NEW] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent:·gc.count                           ss    5           20.000 
   
   [OLD] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent                                     ss    5           26.108 ±           0.343    s/op
   [OLD] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent:·gc.count                           ss    5          102.000                    counts
   [NEW] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent                                     ss    5           24.331 ±           0.392    s/op
   [NEW] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent:·gc.count                           ss    5           32.000                    counts
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202757906


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)
+      .collect { case (expr, attr) if !expr.deterministic => attr }
+    val nonDeterministicInsertAttrSet = AttributeSet(nonDeterministicInsertAttrs)
+
+    s"""
+       |// generate DELETE record
+       |${consume(ctx, deleteOutputVars)}
+       |// generate INSERT records
+       |${evaluateVariables(insertLocalInputVars)}

Review Comment:
   Similar to `ProjectExec` but also generates a delete record before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202752035


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {

Review Comment:
   This is a similar optimization as in `ProjectExec`. It ensures if the same input attribute is used in multiple output expressions, we only fetch it once. Suppose I have `c1 = id + 10, c2 = id - 10`. Instead of fetching `id` for each output expression, we can do that once and reuse the same variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204402182


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() {
                         branch)));
   }
 
+  @Test
+  public void subExpressionEliminationInCodegen() {
+    createAndInitTable("id INT, c1 INT, c2 INT");
+
+    sql("INSERT INTO TABLE %s VALUES (1, 11, 111), (2, 22, 222)", tableName);
+    createBranchIfNeeded();
+
+    // disable AQE to see the final plan with codegen in EXPLAIN

Review Comment:
   can we do something like : 
   ```
      def getCodeAndCommentForUpdateRowExec(df: DataFrame): CodeAndComment = {
       val plan = df.queryExecution.executedPlan
       plan.execute()
       val updateRowExec = findTopLevelUpdateRowExec(plan)
       getCodeAndComment(updateRowExec.head)
     }
   
     def findTopLevelUpdateRowExec(plan: SparkPlan): Seq[UpdateRowExec] = {
       filterByType[UpdateRowExec](plan)
     }
   
     private def getCodeAndComment(plan: SparkPlan): CodeAndComment = {
       val codeGenSubTree = WholeStageCodegenExec(plan)(1)
       val codeAndComment = codeGenSubTree.doCodeGen()._2
       try {
         CodeGenerator.compile(codeAndComment)
       } catch {
         case e: Exception =>
           val msg =
             s"""
                |failed to compile:
                |Subtree:
                |$codeGenSubTree
                |Generated code:
                |${CodeFormatter.format(codeAndComment)}
               """.stripMargin
           fail(msg, e)
       }
       codeAndComment
     }
   ```
   
   This way we can test both AQE without AQE plans, thoughts @aokolnychyi  ?



##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   should we also make needCopyResult true, considering this operator produces two row for a single row. 
   
   https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L348



##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)
+      .collect { case (expr, attr) if !expr.deterministic => attr }
+    val nonDeterministicInsertAttrSet = AttributeSet(nonDeterministicInsertAttrs)
+
+    s"""
+       |// generate DELETE record
+       |${consume(ctx, deleteOutputVars)}
+       |// generate INSERT records
+       |${evaluateVariables(insertLocalInputVars)}

Review Comment:
   wondering if we should split DELETE & insert record generation in separate functions as it might potentially cause `CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT` limit exceeded, expand exec, also had to this at one point https://github.com/apache/spark/pull/32457, Thoughts @aokolnychyi ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204742905


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)
+      .collect { case (expr, attr) if !expr.deterministic => attr }
+    val nonDeterministicInsertAttrSet = AttributeSet(nonDeterministicInsertAttrs)
+
+    s"""
+       |// generate DELETE record
+       |${consume(ctx, deleteOutputVars)}
+       |// generate INSERT records
+       |${evaluateVariables(insertLocalInputVars)}

Review Comment:
   I thought about this but was not sure given that we only have two projections. It is probably safer to split. I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi closed pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi closed pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec
URL: https://github.com/apache/iceberg/pull/7691


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204407229


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() {
                         branch)));
   }
 
+  @Test
+  public void subExpressionEliminationInCodegen() {

Review Comment:
   should we now run TestUpdate both with / without Codegen ? as if at the moment there is a fallback is there is an issue that it silently fallbacks to interpreted mode which we don't intend, thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202755142


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =

Review Comment:
   This block ensures the same sub expressions are only evaluated once.
   
   For instance, I have `c1 = id - 10, c2 = id - 10`. This ensures `id - 10` is evaluated once and matches the regular projection.



##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =

Review Comment:
   This block ensures the same sub expressions are evaluated once.
   
   For instance, I have `c1 = id - 10, c2 = id - 10`. This ensures `id - 10` is evaluated once and matches the regular projection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202753561


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {

Review Comment:
   This is inspired by `ProjectExec` and `GenerateExec` in Spark. Checking out those nodes would help reviewing this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202761041


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() {
                         branch)));
   }
 
+  @Test
+  public void subExpressionEliminationInCodegen() {
+    createAndInitTable("id INT, c1 INT, c2 INT");
+
+    sql("INSERT INTO TABLE %s VALUES (1, 11, 111), (2, 22, 222)", tableName);
+    createBranchIfNeeded();
+
+    // disable AQE to see the final plan with codegen in EXPLAIN
+    withSQLConf(
+        ImmutableMap.of(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false"),
+        () -> {
+          String code = generateCode("UPDATE %s SET c1 = c1 - 11, c2 = c1 - 11", commitTarget());

Review Comment:
   Here is the output for the first stage of this operation.
   
   ```
   == Subtree 1 / 2 (maxMethodCodeSize:530; maxConstantPoolSize:144(0.22% used); numInnerClasses:0) ==
   *(1) UpdateRowsExec[__row_operation#43, id#44, c1#45, c2#46, _file#47, _pos#48L, _spec_id#49, _partition#50]
   +- *(1) Project [id#34, c1#35, c2#36, _file#39, _pos#40L, _spec_id#37, _partition#38]
      +- BatchScan testhive.default.table[id#34, c1#35, c2#36, _file#39, _pos#40L, _spec_id#37, _partition#38] testhive.default.table (branch=main) [filters=, groupedBy=] RuntimeFilters: []
   
   Generated code:
   /* 001 */ public Object generate(Object[] references) {
   /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
   /* 003 */ }
   /* 004 */
   /* 005 */ // codegenStageId=1
   /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
   /* 007 */   private Object[] references;
   /* 008 */   private scala.collection.Iterator[] inputs;
   /* 009 */   private scala.collection.Iterator inputadapter_input_0;
   /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[6];
   /* 011 */
   /* 012 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
   /* 013 */     this.references = references;
   /* 014 */   }
   /* 015 */
   /* 016 */   public void init(int index, scala.collection.Iterator[] inputs) {
   /* 017 */     partitionIndex = index;
   /* 018 */     this.inputs = inputs;
   /* 019 */     inputadapter_input_0 = inputs[0];
   /* 020 */     project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(7, 64);
   /* 021 */     project_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[0], 0);
   /* 022 */     project_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
   /* 023 */     project_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[2], 0);
   /* 024 */     project_mutableStateArray_0[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
   /* 025 */     project_mutableStateArray_0[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[4], 0);
   /* 026 */
   /* 027 */   }
   /* 028 */
   /* 029 */   protected void processNext() throws java.io.IOException {
   /* 030 */     while ( inputadapter_input_0.hasNext()) {
   /* 031 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
   /* 032 */
   /* 033 */       // common sub-expressions
   /* 034 */
   /* 035 */       boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
   /* 036 */       int inputadapter_value_1 = inputadapter_isNull_1 ?
   /* 037 */       -1 : (inputadapter_row_0.getInt(1));
   /* 038 */
   /* 039 */       // generate DELETE record
   /* 040 */
   /* 041 */       UTF8String inputadapter_value_3 = inputadapter_row_0.getUTF8String(3);
   /* 042 */       long inputadapter_value_4 = inputadapter_row_0.getLong(4);
   /* 043 */       int inputadapter_value_5 = inputadapter_row_0.getInt(5);
   /* 044 */       boolean inputadapter_isNull_6 = inputadapter_row_0.isNullAt(6);
   /* 045 */       InternalRow inputadapter_value_6 = inputadapter_isNull_6 ?
   /* 046 */       null : (inputadapter_row_0.getStruct(6, 0));
   /* 047 */       project_mutableStateArray_0[2].reset();
   /* 048 */
   /* 049 */       project_mutableStateArray_0[2].zeroOutNullBytes();
   /* 050 */
   /* 051 */       project_mutableStateArray_0[2].write(0, 1);
   /* 052 */
   /* 053 */       if (true) {
   /* 054 */         project_mutableStateArray_0[2].setNullAt(1);
   /* 055 */       } else {
   /* 056 */         project_mutableStateArray_0[2].write(1, -1);
   /* 057 */       }
   /* 058 */
   /* 059 */       if (true) {
   /* 060 */         project_mutableStateArray_0[2].setNullAt(2);
   /* 061 */       } else {
   /* 062 */         project_mutableStateArray_0[2].write(2, -1);
   /* 063 */       }
   /* 064 */
   /* 065 */       if (true) {
   /* 066 */         project_mutableStateArray_0[2].setNullAt(3);
   /* 067 */       } else {
   /* 068 */         project_mutableStateArray_0[2].write(3, -1);
   /* 069 */       }
   /* 070 */
   /* 071 */       if (false) {
   /* 072 */         project_mutableStateArray_0[2].setNullAt(4);
   /* 073 */       } else {
   /* 074 */         project_mutableStateArray_0[2].write(4, inputadapter_value_3);
   /* 075 */       }
   /* 076 */
   /* 077 */       if (false) {
   /* 078 */         project_mutableStateArray_0[2].setNullAt(5);
   /* 079 */       } else {
   /* 080 */         project_mutableStateArray_0[2].write(5, inputadapter_value_4);
   /* 081 */       }
   /* 082 */
   /* 083 */       if (false) {
   /* 084 */         project_mutableStateArray_0[2].setNullAt(6);
   /* 085 */       } else {
   /* 086 */         project_mutableStateArray_0[2].write(6, inputadapter_value_5);
   /* 087 */       }
   /* 088 */
   /* 089 */       if (inputadapter_isNull_6) {
   /* 090 */         project_mutableStateArray_0[2].setNullAt(7);
   /* 091 */       } else {
   /* 092 */         final InternalRow updaterows_tmpInput_0 = inputadapter_value_6;
   /* 093 */         if (updaterows_tmpInput_0 instanceof UnsafeRow) {
   /* 094 */           project_mutableStateArray_0[2].write(7, (UnsafeRow) updaterows_tmpInput_0);
   /* 095 */         } else {
   /* 096 */           // Remember the current cursor so that we can calculate how many bytes are
   /* 097 */           // written later.
   /* 098 */           final int updaterows_previousCursor_0 = project_mutableStateArray_0[2].cursor();
   /* 099 */
   /* 100 */           project_mutableStateArray_0[3].resetRowWriter();
   /* 101 */
   /* 102 */           project_mutableStateArray_0[2].setOffsetAndSizeFromPreviousCursor(7, updaterows_previousCursor_0);
   /* 103 */         }
   /* 104 */       }
   /* 105 */       append((project_mutableStateArray_0[2].getRow()));
   /* 106 */
   /* 107 */       // generate INSERT records
   /* 108 */
   /* 109 */       boolean updaterows_isNull_8 = true;
   /* 110 */       int updaterows_value_8 = -1;
   /* 111 */
   /* 112 */       if (!inputadapter_isNull_1) {
   /* 113 */         updaterows_isNull_8 = false; // resultCode could change nullability.
   /* 114 */
   /* 115 */         updaterows_value_8 = inputadapter_value_1 - 11;
   /* 116 */
   /* 117 */       }
   /* 118 */
   /* 119 */       boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
   /* 120 */       int inputadapter_value_0 = inputadapter_isNull_0 ?
   /* 121 */       -1 : (inputadapter_row_0.getInt(0));
   /* 122 */       project_mutableStateArray_0[4].reset();
   /* 123 */
   /* 124 */       project_mutableStateArray_0[4].zeroOutNullBytes();
   /* 125 */
   /* 126 */       project_mutableStateArray_0[4].write(0, 3);
   /* 127 */
   /* 128 */       if (inputadapter_isNull_0) {
   /* 129 */         project_mutableStateArray_0[4].setNullAt(1);
   /* 130 */       } else {
   /* 131 */         project_mutableStateArray_0[4].write(1, inputadapter_value_0);
   /* 132 */       }
   /* 133 */
   /* 134 */       if (updaterows_isNull_8) {
   /* 135 */         project_mutableStateArray_0[4].setNullAt(2);
   /* 136 */       } else {
   /* 137 */         project_mutableStateArray_0[4].write(2, updaterows_value_8);
   /* 138 */       }
   /* 139 */
   /* 140 */       if (updaterows_isNull_8) {
   /* 141 */         project_mutableStateArray_0[4].setNullAt(3);
   /* 142 */       } else {
   /* 143 */         project_mutableStateArray_0[4].write(3, updaterows_value_8);
   /* 144 */       }
   /* 145 */
   /* 146 */       if (true) {
   /* 147 */         project_mutableStateArray_0[4].setNullAt(4);
   /* 148 */       } else {
   /* 149 */         project_mutableStateArray_0[4].write(4, ((UTF8String)null));
   /* 150 */       }
   /* 151 */
   /* 152 */       if (true) {
   /* 153 */         project_mutableStateArray_0[4].setNullAt(5);
   /* 154 */       } else {
   /* 155 */         project_mutableStateArray_0[4].write(5, -1L);
   /* 156 */       }
   /* 157 */
   /* 158 */       if (true) {
   /* 159 */         project_mutableStateArray_0[4].setNullAt(6);
   /* 160 */       } else {
   /* 161 */         project_mutableStateArray_0[4].write(6, -1);
   /* 162 */       }
   /* 163 */
   /* 164 */       if (true) {
   /* 165 */         project_mutableStateArray_0[4].setNullAt(7);
   /* 166 */       } else {
   /* 167 */         final InternalRow wholestagecodegen_tmpInput_0 = ((InternalRow)null);
   /* 168 */         if (wholestagecodegen_tmpInput_0 instanceof UnsafeRow) {
   /* 169 */           project_mutableStateArray_0[4].write(7, (UnsafeRow) wholestagecodegen_tmpInput_0);
   /* 170 */         } else {
   /* 171 */           // Remember the current cursor so that we can calculate how many bytes are
   /* 172 */           // written later.
   /* 173 */           final int wholestagecodegen_previousCursor_0 = project_mutableStateArray_0[4].cursor();
   /* 174 */
   /* 175 */           project_mutableStateArray_0[5].resetRowWriter();
   /* 176 */
   /* 177 */           project_mutableStateArray_0[4].setOffsetAndSizeFromPreviousCursor(7, wholestagecodegen_previousCursor_0);
   /* 178 */         }
   /* 179 */       }
   /* 180 */       append((project_mutableStateArray_0[4].getRow()));
   /* 181 */       if (shouldStop()) return;
   /* 182 */     }
   /* 183 */   }
   /* 184 */
   /* 185 */ }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#issuecomment-1559885086

   cc @singhpk234 @amogh-jahagirdar @RussellSpitzer @szehon-ho @flyrain 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1203217632


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)

Review Comment:
   Unfortunately, Spark only allows non-deterministic expressions in a few nodes and all custom nodes are prohibited. We will need to add this to Spark to allow non-deterministic expressions in assignments. It is a similar problem in MERGE operations. Until then, I can't add a test. That said, some vendors that control Spark may be able to modify Spark so this logic would apply.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7691: Spark 3.4: Codegen support for UpdateRowsExec

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1203217632


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)

Review Comment:
   Unfortunately, Spark only allows non-deterministic expressions in a few nodes and all custom nodes are prohibited. We will need to add this to Spark to allow non-deterministic expressions in assignments. It is a similar problem in MERGE operations. Until then, I can't add a test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org