You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wankunde (via GitHub)" <gi...@apache.org> on 2023/03/22 14:19:46 UTC

[GitHub] [spark] wankunde opened a new pull request, #40523: [SPARK-42897][SQL] Avoid evaluate more than once for the variables from the left side in the FullOuter SMJ condition

wankunde opened a new pull request, #40523:
URL: https://github.com/apache/spark/pull/40523

   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   For example:
   ```
     val df1 = spark.range(5).select($"id".as("k1"))
     val df2 = spark.range(10).select($"id".as("k2"))
     df1.join(df2.hint("SHUFFLE_MERGE"),
         $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2" && $"k1" + 5 =!= $"k2", "full_outer")
   ```
   the join condition **$"k1" + 3 =!= $"k2"** and **$"k1" + 5 =!= $"k2"** both will evaluate the variable **k1** and caused the codegen failed.
   In this PR, we will evaluate the variables from the left side and used in the join condition before codegen for the join condition.
   
   ### Why are the changes needed?
   
   Bug fix for codegen issue in FullOuter SMJ.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Added UT
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160456984


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -364,10 +364,18 @@ case class ShuffledHashJoinExec(
          |${streamedKeyExprCode.code}
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
-
+    // The streamedVars may be evaluated again in the following consumeFullOuterJoinRow method,
+    // so generate the condition checking code with their copies.

Review Comment:
   ```suggestion
       // so generate the condition checking code with their copies, to avoid side effects (ExprCode is mutable).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1159811512


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -365,9 +365,21 @@ case class ShuffledHashJoinExec(
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
 
+    // Evaluate the variables from the stream side and used in the condition but do not clear the
+    // code as they may be used in the following function.

Review Comment:
   I don't quite understand why it's only a bug for full outer join. Inner join invokes `getJoinCondition` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160396781


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala:
##########
@@ -1036,8 +1013,14 @@ case class SortMergeJoinExec(
     val rightResultVars = genOneSideJoinVars(
       ctx, rightOutputRow, right, setDefaultValue = true)
     val resultVars = leftResultVars ++ rightResultVars
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
+    val copiedLeftResultVars = leftResultVars.map(v => v.copy())

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala:
##########
@@ -1036,8 +1013,14 @@ case class SortMergeJoinExec(
     val rightResultVars = genOneSideJoinVars(
       ctx, rightOutputRow, right, setDefaultValue = true)
     val resultVars = leftResultVars ++ rightResultVars
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
+    val copiedLeftResultVars = leftResultVars.map(v => v.copy())
+    val (leftBefore, _) = splitVarsByCondition(left.output, copiedLeftResultVars)
+    val (_, conditionCheckWithoutLeftVars, _) =
+      getJoinCondition(ctx, copiedLeftResultVars, left, right, Some(rightOutputRow))
+    val conditionCheck =

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #40523:
URL: https://github.com/apache/spark/pull/40523#issuecomment-1637231202

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160396428


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -364,10 +364,15 @@ case class ShuffledHashJoinExec(
          |${streamedKeyExprCode.code}
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
-
+    val copiedStreamedVars = streamedVars.map(v => v.copy())
+    val (streamedBefore, _) = splitVarsByCondition(streamedOutput, copiedStreamedVars)
     // Generate code for join condition
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+    val (_, conditionCheckWithoutStreamVars, _) =
+      getJoinCondition(ctx, copiedStreamedVars, streamedPlan, buildPlan, Some(buildRow))
+    val conditionCheck =
+      s"""$streamedBefore

Review Comment:
   nit:
   ```
   s"""
      |...
      |...
      |""". stripMargin
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #40523: [SPARK-42897][SQL] Avoid evaluate more than once for the variables from the left side in the FullOuter SMJ condition

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #40523:
URL: https://github.com/apache/spark/pull/40523#issuecomment-1486090996

   Hi, @c21 @cloud-fan  this seems to be SMJ full outer join codegen bug, could you have a look at this issue ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join
URL: https://github.com/apache/spark/pull/40523


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160044695


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala:
##########
@@ -1036,8 +1036,17 @@ case class SortMergeJoinExec(
     val rightResultVars = genOneSideJoinVars(
       ctx, rightOutputRow, right, setDefaultValue = true)
     val resultVars = leftResultVars ++ rightResultVars
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
+    // Evaluate the variables on the left and used in the condition but do not clear the code as

Review Comment:
   > because evaluateVariables will empty the variable's code
   
   can we copy the variables and invoke `evaluateVariables` with the copies? Then the original variables are unchanged and can be safely accessed later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160396011


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -364,10 +364,15 @@ case class ShuffledHashJoinExec(
          |${streamedKeyExprCode.code}
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
-
+    val copiedStreamedVars = streamedVars.map(v => v.copy())
+    val (streamedBefore, _) = splitVarsByCondition(streamedOutput, copiedStreamedVars)
     // Generate code for join condition
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+    val (_, conditionCheckWithoutStreamVars, _) =

Review Comment:
   actually I think the new name is a bit confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160381740


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala:
##########
@@ -1036,8 +1036,17 @@ case class SortMergeJoinExec(
     val rightResultVars = genOneSideJoinVars(
       ctx, rightOutputRow, right, setDefaultValue = true)
     val resultVars = leftResultVars ++ rightResultVars
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
+    // Evaluate the variables on the left and used in the condition but do not clear the code as

Review Comment:
   Yes, you are right, I updated the code and it is much clearer than before. 
   Thanks for your help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate more than once for the variables from the left side in the FullOuter SMJ condition

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1158021949


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala:
##########
@@ -1036,8 +1036,17 @@ case class SortMergeJoinExec(
     val rightResultVars = genOneSideJoinVars(
       ctx, rightOutputRow, right, setDefaultValue = true)
     val resultVars = leftResultVars ++ rightResultVars
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
+    // Evaluate the variables on the left and used in the condition but do not clear the code as

Review Comment:
   Thanks @cloud-fan for your comment.
    
   1. `ShuffledHashJoinExec` also has the same issue, and fix it at the same time.
   2. We can not use `splitVarsByCondition` and `evaluateVariables`  because `evaluateVariables` will empty the variable's code, so when we read the same variables in the following `consumeFullOuterJoinRow` method, those variables will be undefined.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1159807739


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -365,9 +365,21 @@ case class ShuffledHashJoinExec(
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
 
+    // Evaluate the variables from the stream side and used in the condition but do not clear the
+    // code as they may be used in the following function.

Review Comment:
   can you show a piece of the generated code to better demonstrate the bug?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1159889577


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -365,9 +365,21 @@ case class ShuffledHashJoinExec(
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
 
+    // Evaluate the variables from the stream side and used in the condition but do not clear the
+    // code as they may be used in the following function.

Review Comment:
   Physical plan of UT
   ```
   val joinWithNonEquiDF = df1.select('k1, 'k1.as('k1_2)).join(df2.hint(hint),
           $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "full_outer")	        $"k1" === $"k2" % 3 && $"k1_2" + 3 =!= $"k2" && $"k1_2" + 5 =!= $"k2",
           "full_outer")
   ```
   is 
   ```
   == Subtree 5 / 5 (maxMethodCodeSize:-1; maxConstantPoolSize:-1; numInnerClasses:-1) ==
   *(5) SortMergeJoin [k1#2L], [(k2#6L % 3)], FullOuter, (NOT ((k1_2#35L + 3) = k2#6L) AND NOT ((k1_2#35L + 5) = k2#6L))
   :- *(2) Sort [k1#2L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(k1#2L, 5), ENSURE_REQUIREMENTS, [plan_id=332]
   :     +- *(1) Project [id#0L AS k1#2L, id#0L AS k1_2#35L]
   :        +- *(1) Range (0, 5, step=1, splits=2)
   +- *(4) Sort [(k2#6L % 3) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning((k2#6L % 3), 5), ENSURE_REQUIREMENTS, [plan_id=338]
         +- *(3) Project [id#4L AS k2#6L]
            +- *(3) Range (0, 10, step=1, splits=2)
   ```
   Before this PR,  `getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))` will generate code from line 234 to line 273
   ```java
   /* 001 */ public Object generate(Object[] references) {
   /* 002 */   return new GeneratedIteratorForCodegenStage5(references);
   /* 003 */ }
   /* 004 */
   /* 005 */ // codegenStageId=5
   /* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator {
   /* 007 */   private Object[] references;
   /* 008 */   private scala.collection.Iterator[] inputs;
   /* 009 */   private scala.collection.Iterator smj_leftInput_0;
   /* 010 */   private scala.collection.Iterator smj_rightInput_0;
   /* 011 */   private InternalRow smj_leftInputRow_0;
   /* 012 */   private InternalRow smj_rightInputRow_0;
   /* 013 */   private long smj_value_4;
   /* 014 */   private InternalRow smj_leftOutputRow_0;
   /* 015 */   private InternalRow smj_rightOutputRow_0;
   /* 016 */   private java.util.ArrayList<InternalRow> smj_leftBuffer_0;
   /* 017 */   private java.util.ArrayList<InternalRow> smj_rightBuffer_0;
   /* 018 */   private org.apache.spark.util.collection.BitSet smj_leftMatched_0;
   /* 019 */   private org.apache.spark.util.collection.BitSet smj_rightMatched_0;
   /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
   /* 021 */
   /* 022 */   public GeneratedIteratorForCodegenStage5(Object[] references) {
   /* 023 */     this.references = references;
   /* 024 */   }
   /* 025 */
   /* 026 */   public void init(int index, scala.collection.Iterator[] inputs) {
   /* 027 */     partitionIndex = index;
   /* 028 */     this.inputs = inputs;
   /* 029 */     smj_leftInput_0 = inputs[0];
   /* 030 */     smj_rightInput_0 = inputs[1];
   /* 031 */
   /* 032 */     smj_leftBuffer_0 = new java.util.ArrayList<InternalRow>();
   /* 033 */     smj_rightBuffer_0 = new java.util.ArrayList<InternalRow>();
   /* 034 */     smj_leftMatched_0 = new org.apache.spark.util.collection.BitSet(1);
   /* 035 */     smj_rightMatched_0 = new org.apache.spark.util.collection.BitSet(1);
   /* 036 */     smj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
   /* 037 */
   /* 038 */   }
   /* 039 */
   /* 040 */   private void smj_consumeFullOuterJoinRow_0() throws java.io.IOException {
   /* 041 */     ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
   /* 042 */
   /* 043 */     boolean smj_isNull_9 = true;
   /* 044 */     long smj_value_10 = -1L;
   /* 045 */     if (smj_leftOutputRow_0 != null) {
   /* 046 */       long smj_value_9 = smj_leftOutputRow_0.getLong(0);
   /* 047 */       smj_isNull_9 = false;
   /* 048 */       smj_value_10 = smj_value_9;
   /* 049 */     }
   /* 050 */     boolean smj_isNull_11 = true;
   /* 051 */     long smj_value_12 = -1L;
   /* 052 */     if (smj_leftOutputRow_0 != null) {
   /* 053 */       long smj_value_11 = smj_leftOutputRow_0.getLong(1);
   /* 054 */       smj_isNull_11 = false;
   /* 055 */       smj_value_12 = smj_value_11;
   /* 056 */     }
   /* 057 */     boolean smj_isNull_13 = true;
   /* 058 */     long smj_value_14 = -1L;
   /* 059 */     if (smj_rightOutputRow_0 != null) {
   /* 060 */       long smj_value_13 = smj_rightOutputRow_0.getLong(0);
   /* 061 */       smj_isNull_13 = false;
   /* 062 */       smj_value_14 = smj_value_13;
   /* 063 */     }
   /* 064 */     smj_mutableStateArray_0[0].reset();
   /* 065 */
   /* 066 */     smj_mutableStateArray_0[0].zeroOutNullBytes();
   /* 067 */
   /* 068 */     if (smj_isNull_9) {
   /* 069 */       smj_mutableStateArray_0[0].setNullAt(0);
   /* 070 */     } else {
   /* 071 */       smj_mutableStateArray_0[0].write(0, smj_value_10);
   /* 072 */     }
   /* 073 */
   /* 074 */     if (smj_isNull_11) {
   /* 075 */       smj_mutableStateArray_0[0].setNullAt(1);
   /* 076 */     } else {
   /* 077 */       smj_mutableStateArray_0[0].write(1, smj_value_12);
   /* 078 */     }
   /* 079 */
   /* 080 */     if (smj_isNull_13) {
   /* 081 */       smj_mutableStateArray_0[0].setNullAt(2);
   /* 082 */     } else {
   /* 083 */       smj_mutableStateArray_0[0].write(2, smj_value_14);
   /* 084 */     }
   /* 085 */     append((smj_mutableStateArray_0[0].getRow()).copy());
   /* 086 */
   /* 087 */   }
         // Method wholestagecodegen_findNextJoinRows_0
   /* 220 */
   /* 221 */   protected void processNext() throws java.io.IOException {
   /* 222 */     while ((smj_leftInputRow_0 != null || smj_leftInput_0.hasNext()) &&
   /* 223 */       (smj_rightInputRow_0 != null || smj_rightInput_0.hasNext())) {
   /* 224 */       wholestagecodegen_findNextJoinRows_0(smj_leftInput_0, smj_rightInput_0);
   /* 225 */
   /* 226 */       int smj_leftIndex_0;
   /* 227 */       int smj_rightIndex_0;
   /* 228 */
   /* 229 */       for (smj_leftIndex_0 = 0; smj_leftIndex_0 < smj_leftBuffer_0.size(); smj_leftIndex_0++) {
   /* 230 */         smj_leftOutputRow_0 = (InternalRow) smj_leftBuffer_0.get(smj_leftIndex_0);
   /* 231 */         for (smj_rightIndex_0 = 0; smj_rightIndex_0 < smj_rightBuffer_0.size(); smj_rightIndex_0++) {
   /* 232 */           smj_rightOutputRow_0 = (InternalRow) smj_rightBuffer_0.get(smj_rightIndex_0);
   /* 233 */
         // Codegen conditionCheck start
   /* 234 */           long smj_value_15 = smj_rightOutputRow_0.getLong(0);  // Codegen for k2#6L
   /* 235 */
         // Variable k1_2#35L will be evaluate at line 239, 258, 53 for three time.
         // CodeGen for expression: NOT ((k1_2#35L + 3) = k2#6L), evaluate k1_2#35L first time.
   /* 236 */           boolean smj_isNull_11 = true;
   /* 237 */           long smj_value_12 = -1L;
   /* 238 */           if (smj_leftOutputRow_0 != null) {
   /* 239 */             long smj_value_11 = smj_leftOutputRow_0.getLong(1); // Codegen for k1_2#35L
   /* 240 */             smj_isNull_11 = false;
   /* 241 */             smj_value_12 = smj_value_11;
   /* 242 */           }
   /* 243 */
   /* 244 */           long smj_value_19 = -1L;
   /* 245 */
   /* 246 */           smj_value_19 = smj_value_12 + 3L;    // Codegen for k1_2#35L + 3
   /* 247 */
   /* 248 */           boolean smj_value_18 = false;
   /* 249 */           smj_value_18 = smj_value_19 == smj_value_15;
   /* 250 */           boolean smj_value_17 = false;
   /* 251 */           smj_value_17 = !(smj_value_18);
   /* 252 */           boolean smj_value_16 = false;
   /* 253 */
         // CodeGen for expression: NOT ((k1_2#35L + 5) = k2#6L), evaluate k1_2#35L again, throw "Redefinition of local variable "smj_isNull_11" " exception
   /* 254 */           if (smj_value_17) {
   /* 255 */             boolean smj_isNull_11 = true;
   /* 256 */             long smj_value_12 = -1L;
   /* 257 */             if (smj_leftOutputRow_0 != null) {
   /* 258 */               long smj_value_11 = smj_leftOutputRow_0.getLong(1);   // Codegen for k1_2#35L
   /* 259 */               smj_isNull_11 = false;
   /* 260 */               smj_value_12 = smj_value_11;
   /* 261 */             }
   /* 262 */
   /* 263 */             long smj_value_25 = -1L;
   /* 264 */
   /* 265 */             smj_value_25 = smj_value_12 + 5L;  // Codegen for k1_2#35L + 5
   /* 266 */
   /* 267 */             boolean smj_value_24 = false;
   /* 268 */             smj_value_24 = smj_value_25 == smj_value_15;
   /* 269 */             boolean smj_value_23 = false;
   /* 270 */             smj_value_23 = !(smj_value_24);
   /* 271 */             smj_value_16 = smj_value_23;
   /* 272 */           }
   /* 273 */           if (!(false || !smj_value_16))
         // Codegen conditionCheck end
   /* 274 */           {
   /* 275 */             smj_consumeFullOuterJoinRow_0();
   /* 276 */             smj_leftMatched_0.set(smj_leftIndex_0);
   /* 277 */             smj_rightMatched_0.set(smj_rightIndex_0);
   /* 278 */           }
   /* 279 */         }
   /* 280 */
   /* 281 */         if (!smj_leftMatched_0.get(smj_leftIndex_0)) {
   /* 282 */           smj_rightOutputRow_0 = null;
   /* 283 */           smj_consumeFullOuterJoinRow_0();
   /* 284 */         }
   /* 285 */       }
   /* 286 */
   /* 287 */       smj_leftOutputRow_0 = null;
   /* 288 */       for (smj_rightIndex_0 = 0; smj_rightIndex_0 < smj_rightBuffer_0.size(); smj_rightIndex_0++) {
   /* 289 */         if (!smj_rightMatched_0.get(smj_rightIndex_0)) {
   /* 290 */           // The right row has never matched any left row, join it with null row
   /* 291 */           smj_rightOutputRow_0 = (InternalRow) smj_rightBuffer_0.get(smj_rightIndex_0);
   /* 292 */           smj_consumeFullOuterJoinRow_0();
   /* 293 */         }
   /* 294 */       }
   /* 295 */
   /* 296 */       if (shouldStop()) return;
   /* 297 */     }
   /* 298 */
   /* 299 */     // The right iterator has no more rows, join left row with null
   /* 300 */     while (smj_leftInputRow_0 != null || smj_leftInput_0.hasNext()) {
   /* 301 */       if (smj_leftInputRow_0 == null) {
   /* 302 */         smj_leftInputRow_0 = (InternalRow) smj_leftInput_0.next();
   /* 303 */       }
   /* 304 */
   /* 305 */       smj_leftOutputRow_0 = smj_leftInputRow_0;
   /* 306 */       smj_rightOutputRow_0 = null;
   /* 307 */       smj_leftInputRow_0 = null;
   /* 308 */       smj_consumeFullOuterJoinRow_0();
   /* 309 */
   /* 310 */       if (shouldStop()) return;
   /* 311 */     }
   /* 312 */
   /* 313 */     // The left iterator has no more rows, join right row with null
   /* 314 */     while (smj_rightInputRow_0 != null || smj_rightInput_0.hasNext()) {
   /* 315 */       if (smj_rightInputRow_0 == null) {
   /* 316 */         smj_rightInputRow_0 = (InternalRow) smj_rightInput_0.next();
   /* 317 */       }
   /* 318 */
   /* 319 */       smj_rightOutputRow_0 = smj_rightInputRow_0;
   /* 320 */       smj_leftOutputRow_0 = null;
   /* 321 */       smj_rightInputRow_0 = null;
   /* 322 */       smj_consumeFullOuterJoinRow_0();
   /* 323 */
   /* 324 */       if (shouldStop()) return;
   /* 325 */     }
   /* 326 */   }
   /* 327 */
   /* 328 */ }
   ```
   
   After this PR,  the generate code will be line 233 to line 264
   ```java
   /* 001 */ public Object generate(Object[] references) {
   /* 002 */   return new GeneratedIteratorForCodegenStage5(references);
   /* 003 */ }
   /* 004 */
   /* 005 */ // codegenStageId=5
   /* 006 */ final class GeneratedIteratorForCodegenStage5 extends org.apache.spark.sql.execution.BufferedRowIterator {
   /* 007 */   private Object[] references;
   /* 008 */   private scala.collection.Iterator[] inputs;
   /* 009 */   private scala.collection.Iterator smj_leftInput_0;
   /* 010 */   private scala.collection.Iterator smj_rightInput_0;
   /* 011 */   private InternalRow smj_leftInputRow_0;
   /* 012 */   private InternalRow smj_rightInputRow_0;
   /* 013 */   private long smj_value_4;
   /* 014 */   private InternalRow smj_leftOutputRow_0;
   /* 015 */   private InternalRow smj_rightOutputRow_0;
   /* 016 */   private java.util.ArrayList<InternalRow> smj_leftBuffer_0;
   /* 017 */   private java.util.ArrayList<InternalRow> smj_rightBuffer_0;
   /* 018 */   private org.apache.spark.util.collection.BitSet smj_leftMatched_0;
   /* 019 */   private org.apache.spark.util.collection.BitSet smj_rightMatched_0;
   /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] smj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
   /* 021 */
   /* 022 */   public GeneratedIteratorForCodegenStage5(Object[] references) {
   /* 023 */     this.references = references;
   /* 024 */   }
   /* 025 */
   /* 026 */   public void init(int index, scala.collection.Iterator[] inputs) {
   /* 027 */     partitionIndex = index;
   /* 028 */     this.inputs = inputs;
   /* 029 */     smj_leftInput_0 = inputs[0];
   /* 030 */     smj_rightInput_0 = inputs[1];
   /* 031 */
   /* 032 */     smj_leftBuffer_0 = new java.util.ArrayList<InternalRow>();
   /* 033 */     smj_rightBuffer_0 = new java.util.ArrayList<InternalRow>();
   /* 034 */     smj_leftMatched_0 = new org.apache.spark.util.collection.BitSet(1);
   /* 035 */     smj_rightMatched_0 = new org.apache.spark.util.collection.BitSet(1);
   /* 036 */     smj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
   /* 037 */
   /* 038 */   }
   /* 039 */
   /* 040 */   private void smj_consumeFullOuterJoinRow_0() throws java.io.IOException {
   /* 041 */     ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
   /* 042 */
   /* 043 */     boolean smj_isNull_9 = true;
   /* 044 */     long smj_value_10 = -1L;
   /* 045 */     if (smj_leftOutputRow_0 != null) {
   /* 046 */       long smj_value_9 = smj_leftOutputRow_0.getLong(0);
   /* 047 */       smj_isNull_9 = false;
   /* 048 */       smj_value_10 = smj_value_9;
   /* 049 */     }
   /* 050 */     boolean smj_isNull_11 = true;
   /* 051 */     long smj_value_12 = -1L;
   /* 052 */     if (smj_leftOutputRow_0 != null) {
   /* 053 */       long smj_value_11 = smj_leftOutputRow_0.getLong(1);
   /* 054 */       smj_isNull_11 = false;
   /* 055 */       smj_value_12 = smj_value_11;
   /* 056 */     }
   /* 057 */     boolean smj_isNull_13 = true;
   /* 058 */     long smj_value_14 = -1L;
   /* 059 */     if (smj_rightOutputRow_0 != null) {
   /* 060 */       long smj_value_13 = smj_rightOutputRow_0.getLong(0);
   /* 061 */       smj_isNull_13 = false;
   /* 062 */       smj_value_14 = smj_value_13;
   /* 063 */     }
   /* 064 */     smj_mutableStateArray_0[0].reset();
   /* 065 */
   /* 066 */     smj_mutableStateArray_0[0].zeroOutNullBytes();
   /* 067 */
   /* 068 */     if (smj_isNull_9) {
   /* 069 */       smj_mutableStateArray_0[0].setNullAt(0);
   /* 070 */     } else {
   /* 071 */       smj_mutableStateArray_0[0].write(0, smj_value_10);
   /* 072 */     }
   /* 073 */
   /* 074 */     if (smj_isNull_11) {
   /* 075 */       smj_mutableStateArray_0[0].setNullAt(1);
   /* 076 */     } else {
   /* 077 */       smj_mutableStateArray_0[0].write(1, smj_value_12);
   /* 078 */     }
   /* 079 */
   /* 080 */     if (smj_isNull_13) {
   /* 081 */       smj_mutableStateArray_0[0].setNullAt(2);
   /* 082 */     } else {
   /* 083 */       smj_mutableStateArray_0[0].write(2, smj_value_14);
   /* 084 */     }
   /* 085 */     append((smj_mutableStateArray_0[0].getRow()).copy());
   /* 086 */
   /* 087 */   }
         // Method wholestagecodegen_findNextJoinRows_0
   /* 220 */
   /* 221 */   protected void processNext() throws java.io.IOException {
   /* 222 */     while ((smj_leftInputRow_0 != null || smj_leftInput_0.hasNext()) &&
   /* 223 */       (smj_rightInputRow_0 != null || smj_rightInput_0.hasNext())) {
                   // Find next join rows, which will be consumed later.
   /* 224 */       wholestagecodegen_findNextJoinRows_0(smj_leftInput_0, smj_rightInput_0);
   /* 225 */
   /* 226 */       int smj_leftIndex_0;
   /* 227 */       int smj_rightIndex_0;
   /* 228 */
   /* 229 */       for (smj_leftIndex_0 = 0; smj_leftIndex_0 < smj_leftBuffer_0.size(); smj_leftIndex_0++) {
   /* 230 */         smj_leftOutputRow_0 = (InternalRow) smj_leftBuffer_0.get(smj_leftIndex_0);
   /* 231 */         for (smj_rightIndex_0 = 0; smj_rightIndex_0 < smj_rightBuffer_0.size(); smj_rightIndex_0++) {
   /* 232 */           smj_rightOutputRow_0 = (InternalRow) smj_rightBuffer_0.get(smj_rightIndex_0);
         // Codegen conditionCheck start
         // Begin evaluate the variable k1_2#35L in the left side. 
         // We can not use the tool evaluateVariables method, because variable k1_2#35L will be evaluate again in smj_consumeFullOuterJoinRow_0(), see line #53
   /* 233 */           boolean smj_isNull_11 = true;
   /* 234 */           long smj_value_12 = -1L;
   /* 235 */           if (smj_leftOutputRow_0 != null) {
   /* 236 */             long smj_value_11 = smj_leftOutputRow_0.getLong(1);   // Codegen for k1_2#35L
   /* 237 */             smj_isNull_11 = false;
   /* 238 */             smj_value_12 = smj_value_11;
   /* 239 */           }
         // Spark code: getJoinCondition(ctx, leftResultVars.map(_.copy(code = EmptyBlock)), left, right, Some(rightOutputRow))
         // To avoid evaluate the variables again, set the code of the variables to EmptyBlock
         // CodeGen for expression: NOT ((k1_2#35L + 3) = k2#6L)
   /* 240 */
   /* 241 */           long smj_value_15 = smj_rightOutputRow_0.getLong(0);    // Codegen for k2#6L
   /* 242 */
   /* 243 */           long smj_value_19 = -1L;
   /* 244 */
   /* 245 */           smj_value_19 = smj_value_12 + 3L;    // Codegen for k1_2#35L + 3
   /* 246 */
   /* 247 */           boolean smj_value_18 = false;
   /* 248 */           smj_value_18 = smj_value_19 == smj_value_15;
   /* 249 */           boolean smj_value_17 = false;
   /* 250 */           smj_value_17 = !(smj_value_18);
   /* 251 */           boolean smj_value_16 = false;
   /* 252 */
         // CodeGen for expression: NOT ((k1_2#35L + 5) = k2#6L)
   /* 253 */           if (smj_value_17) {
   /* 254 */             long smj_value_25 = -1L;
   /* 255 */
   /* 256 */             smj_value_25 = smj_value_12 + 5L;    // Codegen for k1_2#35L + 5
   /* 257 */
   /* 258 */             boolean smj_value_24 = false;
   /* 259 */             smj_value_24 = smj_value_25 == smj_value_15;
   /* 260 */             boolean smj_value_23 = false;
   /* 261 */             smj_value_23 = !(smj_value_24);
   /* 262 */             smj_value_16 = smj_value_23;
   /* 263 */           }
   /* 264 */           if (!(false || !smj_value_16))
         // Codegen conditionCheck end
   /* 265 */
   /* 266 */           {
   /* 267 */             smj_consumeFullOuterJoinRow_0();
   /* 268 */             smj_leftMatched_0.set(smj_leftIndex_0);
   /* 269 */             smj_rightMatched_0.set(smj_rightIndex_0);
   /* 270 */           }
   /* 271 */         }
   /* 272 */
   /* 273 */         if (!smj_leftMatched_0.get(smj_leftIndex_0)) {
   /* 274 */           smj_rightOutputRow_0 = null;
   /* 275 */           smj_consumeFullOuterJoinRow_0();
   /* 276 */         }
   /* 277 */       }
   /* 278 */
   /* 279 */       smj_leftOutputRow_0 = null;
   /* 280 */       for (smj_rightIndex_0 = 0; smj_rightIndex_0 < smj_rightBuffer_0.size(); smj_rightIndex_0++) {
   /* 281 */         if (!smj_rightMatched_0.get(smj_rightIndex_0)) {
   /* 282 */           // The right row has never matched any left row, join it with null row
   /* 283 */           smj_rightOutputRow_0 = (InternalRow) smj_rightBuffer_0.get(smj_rightIndex_0);
   /* 284 */           smj_consumeFullOuterJoinRow_0();
   /* 285 */         }
   /* 286 */       }
   /* 287 */
   /* 288 */       if (shouldStop()) return;
   /* 289 */     }
   /* 290 */
   /* 291 */     // The right iterator has no more rows, join left row with null
   /* 292 */     while (smj_leftInputRow_0 != null || smj_leftInput_0.hasNext()) {
   /* 293 */       if (smj_leftInputRow_0 == null) {
   /* 294 */         smj_leftInputRow_0 = (InternalRow) smj_leftInput_0.next();
   /* 295 */       }
   /* 296 */
   /* 297 */       smj_leftOutputRow_0 = smj_leftInputRow_0;
   /* 298 */       smj_rightOutputRow_0 = null;
   /* 299 */       smj_leftInputRow_0 = null;
   /* 300 */       smj_consumeFullOuterJoinRow_0();
   /* 301 */
   /* 302 */       if (shouldStop()) return;
   /* 303 */     }
   /* 304 */
   /* 305 */     // The left iterator has no more rows, join right row with null
   /* 306 */     while (smj_rightInputRow_0 != null || smj_rightInput_0.hasNext()) {
   /* 307 */       if (smj_rightInputRow_0 == null) {
   /* 308 */         smj_rightInputRow_0 = (InternalRow) smj_rightInput_0.next();
   /* 309 */       }
   /* 310 */
   /* 311 */       smj_rightOutputRow_0 = smj_rightInputRow_0;
   /* 312 */       smj_leftOutputRow_0 = null;
   /* 313 */       smj_rightInputRow_0 = null;
   /* 314 */       smj_consumeFullOuterJoinRow_0();
   /* 315 */
   /* 316 */       if (shouldStop()) return;
   /* 317 */     }
   /* 318 */   }
   /* 319 */
   /* 320 */ }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160396078


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -364,10 +364,15 @@ case class ShuffledHashJoinExec(
          |${streamedKeyExprCode.code}
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
-
+    val copiedStreamedVars = streamedVars.map(v => v.copy())

Review Comment:
   can we add some comments to explain why do we copy here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160439594


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala:
##########
@@ -95,4 +95,27 @@ trait JoinCodegenSupport extends CodegenSupport with BaseJoinExec {
       }
     }
   }
+
+  /**
+   * Splits variables based on whether it's used by condition or not, returns the code to create

Review Comment:
   Yes, move `splitVarsByCondition` method into JoinCodegenSupport, so it can be used in ShuffledHashJoinExec



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1159906763


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala:
##########
@@ -365,9 +365,21 @@ case class ShuffledHashJoinExec(
        """.stripMargin
     val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
 
+    // Evaluate the variables from the stream side and used in the condition but do not clear the
+    // code as they may be used in the following function.

Review Comment:
   
   > I don't quite understand why it's only a bug for full outer join. Inner join invokes `getJoinCondition` as well.
   
   I'm sorry, SMJ inner join doesn't invokes `getJoinCondition`?
   And inner join evaluate the variables before codegen the condition expression. 
   ```
         val (streamedBefore, streamedAfter) = splitVarsByCondition(streamedOutput, streamedVars)
         val (bufferedBefore, bufferedAfter) = splitVarsByCondition(bufferedOutput, bufferedVars)
   ```
   The parent operator will consume the join result in the same method, so those variables don't need evaluate again while full outer join will evaluate those variables again in method `smj_consumeFullOuterJoinRow_0()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate variables multiple times for SMJ and SHJ fullOuter join

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1160395746


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala:
##########
@@ -95,4 +95,27 @@ trait JoinCodegenSupport extends CodegenSupport with BaseJoinExec {
       }
     }
   }
+
+  /**
+   * Splits variables based on whether it's used by condition or not, returns the code to create

Review Comment:
   I assume this just moves the code without any change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate more than once for the variables from the left side in the FullOuter SMJ condition

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1154099624


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala:
##########
@@ -1036,8 +1036,17 @@ case class SortMergeJoinExec(
     val rightResultVars = genOneSideJoinVars(
       ctx, rightOutputRow, right, setDefaultValue = true)
     val resultVars = leftResultVars ++ rightResultVars
-    val (_, conditionCheck, _) =
-      getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
+    // Evaluate the variables on the left and used in the condition but do not clear the code as

Review Comment:
   This is very hard to review. Is there any similar code in other join types or `ShuffledHashJoinExec`? It seems other places are using `splitVarsByCondition` and `evaluateVariables`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org