You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/30 00:12:37 UTC

[GitHub] [spark] c21 opened a new pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

c21 opened a new pull request #34444:
URL: https://github.com/apache/spark/pull/34444


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   As title. This PR is to add code-gen support for FULL OUTER shuffled hash join. 
   
   The main change is in `ShuffledHashJoinExec.scala:doProduce()` to generate code for FULL OUTER join.
   * `ShuffledHashJoinExec.scala:codegenFullOuterJoinWithUniqueKey()` is the code for join with unique join key from build side.
   * `ShuffledHashJoinExec.scala:codegenFullOuterJoinWithNonUniqueKey()` is the code for join with non-unique key.
   
   Example query:
   
   ```
   val df1 = spark.range(5).select($"id".as("k1"))
   val df2 = spark.range(10).select($"id".as("k2"))
   df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2" % 3 && $"k1" + 3 =!= $"k2", "full_outer")
   ```
   
   Generated code for example query: https://gist.github.com/c21/828b782ee81827f4148939cb50314a7b
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Improve query performance for FULL OUTER shuffled hash join.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   * Added unit test in `WholeStageCodegenSuite`.
   * Existing unit test in `OuterJoinSuite`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955111466


   **[Test build #144771 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144771/testReport)** for PR 34444 at commit [`e787a36`](https://github.com/apache/spark/commit/e787a362bf82c6ed464a858586880f0a2cca6e62).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955156454


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956882474






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955123457


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49240/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955157743


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741525180



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - sorry if I am missing anything, `anyNull` returns whether any null of the current input row (`UnsafeRow.anyNull()`). The result is varied for each input row in while loop. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955144498


   **[Test build #144776 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144776/testReport)** for PR 34444 at commit [`e787a36`](https://github.com/apache/spark/commit/e787a362bf82c6ed464a858586880f0a2cca6e62).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741612543



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - no worries, thanks for the careful review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955825020


   @cloud-fan could you help take a look when you have time? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740786516



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       isn't `anyNull` called more frequently than `keyIsUnique`> We call `anyNull` in the while loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956882474






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-958626996


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-958626996


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956882474


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49289/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955143080


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144771/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #34444:
URL: https://github.com/apache/spark/pull/34444


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741612543



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - no worries, thanks for the careful review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956882474






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #34444:
URL: https://github.com/apache/spark/pull/34444


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-958626996


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740045221



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"

Review comment:
       This is so short. I think we can just inline it, instead of creating a variable and passing it to the methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740038668



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))

Review comment:
       `rewriteKeyExpr` does not change the expression references. I think we can just do `streamedKeys.map(_.references)` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740534742



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"

Review comment:
       @cloud-fan - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       I want to avoid per-row call of `UnsafeHashedRelation.keyIsUnique()`. So store the function call result into a variable of `keyIsUnique`, and only call once for all rows (example - [line 079 of `shj_keyIsUnique_0 `](https://gist.github.com/c21/828b782ee81827f4148939cb50314a7b)). Though `UnsafeHashedRelation.keyIsUnique()` call is very cheap now (just `binaryMap.numKeys() == binaryMap.numValues()`), but it can prevent future bug if the call becomes expensive if we change the implementation later, and it's just looking better.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))

Review comment:
       @cloud-fan - yes, thanks, updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"

Review comment:
       @cloud-fan - agree, updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"
+    val isRowMatched = s"$matchedRowSet.contains($rowIndex)"

Review comment:
       @cloud-fan - updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #34444:
URL: https://github.com/apache/spark/pull/34444


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955111466


   **[Test build #144771 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144771/testReport)** for PR 34444 at commit [`e787a36`](https://github.com/apache/spark/commit/e787a362bf82c6ed464a858586880f0a2cca6e62).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Tagar commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
Tagar commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956538097


   Just out of curiosity, how much performance gain this code generation brings? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740046205



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"

Review comment:
       ditto, let's inline the short code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956587980


   > Just out of curiosity, how much performance gain this code generation brings?
   
   @Tagar - I ran the small micro benchmark (similar to Spark's `JoinBenchmark.scala`), it can give ~10-20% run time improvement for the given query. I did notice the run time improvement may vary per run of benchmark, and I don't expect real production query can get as much as improvement in theory. So do take the number as just a reference here.
   
   ```
   val N: Long = 4 << 20
       withSQLConf(
         SQLConf.SHUFFLE_PARTITIONS.key -> "2",
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
         SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
         codegenBenchmark("shuffle hash join", N) {
           val df1 = spark.range(N).selectExpr(s"id as k1")
           val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
           val df = df1.join(df2, col("k1") === col("k2"), "full_outer")
           assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
           df.noop()
         }
       }
   
   Running benchmark: shuffle hash join
     Running case: shuffle hash join wholestage off
     Stopped after 2 iterations, 3051 ms
     Running case: shuffle hash join wholestage on
     Stopped after 5 iterations, 6638 ms
   
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.16
   Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
   shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   ------------------------------------------------------------------------------------------------------------------------
   shuffle hash join wholestage off                   1519           1526          10          2.8         362.2       1.0X
   shuffle hash join wholestage on                    1273           1328          70          3.3         303.4       1.2X
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741612543



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - no worries, thanks for the careful review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956639649


   **[Test build #144819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144819/testReport)** for PR 34444 at commit [`5dcd5db`](https://github.com/apache/spark/commit/5dcd5db632185d3b53a1179ae8dd1cbc46003bb1).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741525180



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - sorry if I am missing anything, `anyNull` returns whether any null of the current input row (`UnsafeRow.anyNull()`). The result is varied for each input row in while loop. Just to make sure we are in the same page, I am referring to `anyNull` call in [line 094 & 157 of example code-gen](https://gist.github.com/c21/828b782ee81827f4148939cb50314a7b).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740534742



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"

Review comment:
       @cloud-fan - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       I want to avoid per-row call of `UnsafeHashedRelation.keyIsUnique()`. So store the function call result into a variable of `keyIsUnique`, and only call once for all rows (example - [line 079 of `shj_keyIsUnique_0 `](https://gist.github.com/c21/828b782ee81827f4148939cb50314a7b)). Though `UnsafeHashedRelation.keyIsUnique()` call is very cheap now (just `binaryMap.numKeys() == binaryMap.numValues()`), but it can prevent future bug if the call becomes expensive if we change the implementation later, and it's just looking better.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))

Review comment:
       @cloud-fan - yes, thanks, updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"

Review comment:
       @cloud-fan - agree, updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"
+    val isRowMatched = s"$matchedRowSet.contains($rowIndex)"

Review comment:
       @cloud-fan - updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741594274



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       ah I misread the code, nvm




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956639649


   **[Test build #144819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144819/testReport)** for PR 34444 at commit [`5dcd5db`](https://github.com/apache/spark/commit/5dcd5db632185d3b53a1179ae8dd1cbc46003bb1).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955144498


   **[Test build #144776 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144776/testReport)** for PR 34444 at commit [`e787a36`](https://github.com/apache/spark/commit/e787a362bf82c6ed464a858586880f0a2cca6e62).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955144426


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741594274



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       ah I misread the code, nvm




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-958647134






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956639649






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740786516



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       isn't `anyNull` called more frequently than `keyIsUnique`. We call `anyNull` in the while loop;.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740786516



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       isn't `anyNull` called more frequently than `keyIsUnique`. We call `anyNull` in the while loop;.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       isn't `anyNull` called more frequently than `keyIsUnique`> We call `anyNull` in the while loop.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       Isn't `anyNull` called more frequently than `keyIsUnique`? We call `anyNull` in the while loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741525180



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - sorry if I am missing anything, `anyNull` returns whether any null of the current input row (`UnsafeRow.anyNull()`). The result is varied for each input row in while loop. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - sorry if I am missing anything, `anyNull` returns whether any null of the current input row (`UnsafeRow.anyNull()`). The result is varied for each input row in while loop. Just to make sure we are in the same page, I am referring to `anyNull` call in [line 094 & 157 of example code-gen](https://gist.github.com/c21/828b782ee81827f4148939cb50314a7b).

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       @cloud-fan - no worries, thanks for the careful review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956587980


   > Just out of curiosity, how much performance gain this code generation brings?
   
   @Tagar - I ran the small micro benchmark (similar to Spark's `JoinBenchmark.scala`), it can give ~10-20% run time improvement for the given query. I did notice the run time improvement may vary per run of benchmark, and I don't expect real production query can get as much as improvement in theory. So do take the number as just a reference here.
   
   ```
   val N: Long = 4 << 20
       withSQLConf(
         SQLConf.SHUFFLE_PARTITIONS.key -> "2",
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
         SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
         codegenBenchmark("shuffle hash join", N) {
           val df1 = spark.range(N).selectExpr(s"id as k1")
           val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
           val df = df1.join(df2, col("k1") === col("k2"), "full_outer")
           assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
           df.noop()
         }
       }
   
   Running benchmark: shuffle hash join
     Running case: shuffle hash join wholestage off
     Stopped after 2 iterations, 3051 ms
     Running case: shuffle hash join wholestage on
     Stopped after 5 iterations, 6638 ms
   
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.16
   Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
   shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   ------------------------------------------------------------------------------------------------------------------------
   shuffle hash join wholestage off                   1519           1526          10          2.8         362.2       1.0X
   shuffle hash join wholestage on                    1273           1328          70          3.3         303.4       1.2X
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-958647134


   Thank you @cloud-fan for review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-957035534


   **[Test build #144819 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144819/testReport)** for PR 34444 at commit [`5dcd5db`](https://github.com/apache/spark/commit/5dcd5db632185d3b53a1179ae8dd1cbc46003bb1).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956639649






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-958626996


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741594274



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       ah I misread the code, nvm




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955157743


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955143080


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144771/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740786516



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       Isn't `anyNull` called more frequently than `keyIsUnique`? We call `anyNull` in the while loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740786516



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       isn't `anyNull` called more frequently than `keyIsUnique`. We call `anyNull` in the while loop;.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       isn't `anyNull` called more frequently than `keyIsUnique`> We call `anyNull` in the while loop.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       Isn't `anyNull` called more frequently than `keyIsUnique`? We call `anyNull` in the while loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740041930



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       why does `keyIsUnique` need a mutable state but `anyNull` does not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r741594274



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       ah I misread the code, nvm




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956882474


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49289/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955123457


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49240/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955179168


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144776/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955179168


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144776/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955139519


   **[Test build #144771 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144771/testReport)** for PR 34444 at commit [`e787a36`](https://github.com/apache/spark/commit/e787a362bf82c6ed464a858586880f0a2cca6e62).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956639649


   **[Test build #144819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144819/testReport)** for PR 34444 at commit [`5dcd5db`](https://github.com/apache/spark/commit/5dcd5db632185d3b53a1179ae8dd1cbc46003bb1).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-957042169


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144819/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956639649


   **[Test build #144819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144819/testReport)** for PR 34444 at commit [`5dcd5db`](https://github.com/apache/spark/commit/5dcd5db632185d3b53a1179ae8dd1cbc46003bb1).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956852154


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49289/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-958647134


   Thank you @cloud-fan for review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740534742



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"

Review comment:
       @cloud-fan - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"

Review comment:
       I want to avoid per-row call of `UnsafeHashedRelation.keyIsUnique()`. So store the function call result into a variable of `keyIsUnique`, and only call once for all rows (example - [line 079 of `shj_keyIsUnique_0 `](https://gist.github.com/c21/828b782ee81827f4148939cb50314a7b)). Though `UnsafeHashedRelation.keyIsUnique()` call is very cheap now (just `binaryMap.numKeys() == binaryMap.numValues()`), but it can prevent future bug if the call becomes expensive if we change the implementation later, and it's just looking better.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))

Review comment:
       @cloud-fan - yes, thanks, updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"

Review comment:
       @cloud-fan - agree, updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"
+    val isRowMatched = s"$matchedRowSet.contains($rowIndex)"

Review comment:
       @cloud-fan - updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Tagar commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
Tagar commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956538097


   Just out of curiosity, how much performance gain this code generation brings? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #34444:
URL: https://github.com/apache/spark/pull/34444


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956882474






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955151191


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955177858


   **[Test build #144776 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/144776/testReport)** for PR 34444 at commit [`e787a36`](https://github.com/apache/spark/commit/e787a362bf82c6ed464a858586880f0a2cca6e62).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740031392



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)

Review comment:
       `prepareRelation` returns `keyIsUnique`, so it's a consistent before we generating the code. We don't need to get it at runtime in the generated code.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)

Review comment:
       `prepareRelation` returns `keyIsUnique`, so it's a constant before we generating the code. We don't need to get it at runtime in the generated code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956730810


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49289/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955118394


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49240/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-955123449


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49240/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34444:
URL: https://github.com/apache/spark/pull/34444#discussion_r740046613



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -332,6 +327,266 @@ case class ShuffledHashJoinExec(
     HashedRelationInfo(relationTerm, keyIsUnique = false, isEmpty = false)
   }
 
+  override def doProduce(ctx: CodegenContext): String = {
+    // Specialize `doProduce` code for full outer join, because full outer join needs to
+    // iterate streamed and build side separately.
+    if (joinType != FullOuter) {
+      return super.doProduce(ctx)
+    }
+
+    val HashedRelationInfo(relationTerm, _, _) = prepareRelation(ctx)
+
+    // Inline mutable state since not many join operations in a task
+    val keyIsUnique = ctx.addMutableState("boolean", "keyIsUnique",
+      v => s"$v = $relationTerm.keyIsUnique();", forceInline = true)
+    val streamedInput = ctx.addMutableState("scala.collection.Iterator", "streamedInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val buildInput = ctx.addMutableState("scala.collection.Iterator", "buildInput",
+      v => s"$v = $relationTerm.valuesWithKeyIndex();", forceInline = true)
+    val streamedRow = ctx.addMutableState("InternalRow", "streamedRow", forceInline = true)
+    val buildRow = ctx.addMutableState("InternalRow", "buildRow", forceInline = true)
+
+    // Generate variables and related code from streamed side
+    val streamedVars = genOneSideJoinVars(ctx, streamedRow, streamedPlan, setDefaultValue = false)
+    val streamedKeyVariables = evaluateRequiredVariables(streamedOutput, streamedVars,
+      AttributeSet.fromAttributeSets(HashJoin.rewriteKeyExpr(streamedKeys).map(_.references)))
+    ctx.currentVars = streamedVars
+    val streamedKeyExprCode = GenerateUnsafeProjection.createCode(ctx, streamedBoundKeys)
+    val streamedKeyEv =
+      s"""
+         |$streamedKeyVariables
+         |${streamedKeyExprCode.code}
+       """.stripMargin
+    val streamedKeyAnyNull = s"${streamedKeyExprCode.value}.anyNull()"
+
+    // Generate code for join condition
+    val (_, conditionCheck, _) =
+      getJoinCondition(ctx, streamedVars, streamedPlan, buildPlan, Some(buildRow))
+
+    // Generate code for result output in separate function, as we need to output result from
+    // multiple places in join code.
+    val streamedResultVars = genOneSideJoinVars(
+      ctx, streamedRow, streamedPlan, setDefaultValue = true)
+    val buildResultVars = genOneSideJoinVars(
+      ctx, buildRow, buildPlan, setDefaultValue = true)
+    val resultVars = buildSide match {
+      case BuildLeft => buildResultVars ++ streamedResultVars
+      case BuildRight => streamedResultVars ++ buildResultVars
+    }
+    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
+    ctx.addNewFunction(consumeFullOuterJoinRow,
+      s"""
+         |private void $consumeFullOuterJoinRow() {
+         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
+         |  ${consume(ctx, resultVars)}
+         |}
+       """.stripMargin)
+    val stopCheck = "if (shouldStop()) return;"
+
+    val joinWithUniqueKey = codegenFullOuterJoinWithUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+    val joinWithNonUniqueKey = codegenFullOuterJoinWithNonUniqueKey(
+      ctx, (streamedRow, buildRow), (streamedInput, buildInput), streamedKeyEv, streamedKeyAnyNull,
+      streamedKeyExprCode.value, relationTerm, conditionCheck, stopCheck, consumeFullOuterJoinRow)
+
+    s"""
+       |if ($keyIsUnique) {
+       |  $joinWithUniqueKey
+       |} else {
+       |  $joinWithNonUniqueKey
+       |}
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with unique join keys.
+   * This is code-gen version of `fullOuterJoinWithUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedKeySetClsName = classOf[BitSet].getName
+    val matchedKeySet = ctx.addMutableState(matchedKeySetClsName, "matchedKeySet",
+      v => s"$v = new $matchedKeySetClsName($relationTerm.maxNumKeysIndex());", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val foundMatch = ctx.freshName("foundMatch")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val joinStreamSide =
+      s"""
+         |while ($streamedInput.hasNext()) {
+         |  $streamedRow = (InternalRow) $streamedInput.next();
+         |
+         |  // generate join key for stream side
+         |  $streamedKeyEv
+         |
+         |  // find matches from HashedRelation
+         |  boolean $foundMatch = false;
+         |  $buildRow = null;
+         |  $rowWithIndexClsName $rowWithIndex = $streamedKeyAnyNull ? null:
+         |    $relationTerm.getValueWithKeyIndex($streamedKeyValue);
+         |
+         |  if ($rowWithIndex != null) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    // check join condition
+         |    $conditionCheck {
+         |      // set key index in matched keys set
+         |      $matchedKeySet.set($rowWithIndex.getKeyIndex());
+         |      $foundMatch = true;
+         |    }
+         |
+         |    if (!$foundMatch) {
+         |      $buildRow = null;
+         |    }
+         |  }
+         |
+         |  $consumeFullOuterJoinRow();
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    val filterBuildSide =
+      s"""
+         |$streamedRow = null;
+         |
+         |// find non-matched rows from HashedRelation
+         |while ($buildInput.hasNext()) {
+         |  $rowWithIndexClsName $rowWithIndex = ($rowWithIndexClsName) $buildInput.next();
+         |
+         |  // check if key index is not in matched keys set
+         |  if (!$matchedKeySet.get($rowWithIndex.getKeyIndex())) {
+         |    $buildRow = $rowWithIndex.getValue();
+         |    $consumeFullOuterJoinRow();
+         |  }
+         |
+         |  $stopCheck
+         |}
+       """.stripMargin
+
+    s"""
+       |$joinStreamSide
+       |$filterBuildSide
+     """.stripMargin
+  }
+
+  /**
+   * Generates the code for full outer join with non-unique join keys.
+   * This is code-gen version of `fullOuterJoinWithNonUniqueKey()`.
+   */
+  private def codegenFullOuterJoinWithNonUniqueKey(
+      ctx: CodegenContext,
+      rows: (String, String),
+      inputs: (String, String),
+      streamedKeyEv: String,
+      streamedKeyAnyNull: String,
+      streamedKeyValue: ExprValue,
+      relationTerm: String,
+      conditionCheck: String,
+      stopCheck: String,
+      consumeFullOuterJoinRow: String): String = {
+    // Inline mutable state since not many join operations in a task
+    val matchedRowSetClsName = classOf[OpenHashSet[_]].getName
+    val matchedRowSet = ctx.addMutableState(matchedRowSetClsName, "matchedRowSet",
+      v => s"$v = new $matchedRowSetClsName(scala.reflect.ClassTag$$.MODULE$$.Long());",
+      forceInline = true)
+    val prevKeyIndex = ctx.addMutableState("int", "prevKeyIndex",
+      v => s"$v = -1;", forceInline = true)
+    val valueIndex = ctx.addMutableState("int", "valueIndex",
+      v => s"$v = -1;", forceInline = true)
+    val rowWithIndexClsName = classOf[ValueRowWithKeyIndex].getName
+    val rowWithIndex = ctx.freshName("rowWithIndex")
+    val buildIterator = ctx.freshName("buildIterator")
+    val foundMatch = ctx.freshName("foundMatch")
+    val keyIndex = ctx.freshName("keyIndex")
+    val (streamedRow, buildRow) = rows
+    val (streamedInput, buildInput) = inputs
+
+    val rowIndex = s"(((long)$keyIndex) << 32) | $valueIndex"
+    val markRowMatched = s"$matchedRowSet.add($rowIndex);"
+    val isRowMatched = s"$matchedRowSet.contains($rowIndex)"

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-956587980


   > Just out of curiosity, how much performance gain this code generation brings?
   
   @Tagar - I ran the small micro benchmark (similar to Spark's `JoinBenchmark.scala`), it can give ~10-20% run time improvement for the given query. I did notice the run time improvement may vary per run of benchmark, and I don't expect real production query can get as much as improvement in theory. So do take the number as just a reference here.
   
   ```
   val N: Long = 4 << 20
       withSQLConf(
         SQLConf.SHUFFLE_PARTITIONS.key -> "2",
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
         SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
         codegenBenchmark("shuffle hash join", N) {
           val df1 = spark.range(N).selectExpr(s"id as k1")
           val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
           val df = df1.join(df2, col("k1") === col("k2"), "full_outer")
           assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
           df.noop()
         }
       }
   
   Running benchmark: shuffle hash join
     Running case: shuffle hash join wholestage off
     Stopped after 2 iterations, 3051 ms
     Running case: shuffle hash join wholestage on
     Stopped after 5 iterations, 6638 ms
   
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.16
   Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
   shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   ------------------------------------------------------------------------------------------------------------------------
   shuffle hash join wholestage off                   1519           1526          10          2.8         362.2       1.0X
   shuffle hash join wholestage on                    1273           1328          70          3.3         303.4       1.2X
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34444: [SPARK-32567][SQL] Add code-gen for full outer shuffled hash join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34444:
URL: https://github.com/apache/spark/pull/34444#issuecomment-957042169


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144819/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org