You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/17 05:50:20 UTC

[GitHub] [spark] c21 commented on a change in pull request #32210: [SPARK-32634][SQL] Introduce sort-based fallback for shuffled hash join (non-code-gen path)

c21 commented on a change in pull request #32210:
URL: https://github.com/apache/spark/pull/32210#discussion_r615212076



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
##########
@@ -107,266 +107,262 @@ case class SortMergeJoinExec(
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
     requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
-    // This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`.
-    keys.map(SortOrder(_, Ascending))
-  }
-
   private def createLeftKeyGenerator(): Projection =
     UnsafeProjection.create(leftKeys, left.output)
 
   private def createRightKeyGenerator(): Projection =
     UnsafeProjection.create(rightKeys, right.output)
 
-  private def getSpillThreshold: Int = {
-    sqlContext.conf.sortMergeJoinExecBufferSpillThreshold
-  }
-
-  private def getInMemoryThreshold: Int = {
-    sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold
-  }
-
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
     val spillThreshold = getSpillThreshold
     val inMemoryThreshold = getInMemoryThreshold
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
-      val boundCondition: (InternalRow) => Boolean = {
-        condition.map { cond =>
-          Predicate.create(cond, left.output ++ right.output).eval _
-        }.getOrElse {
-          (r: InternalRow) => true
-        }
+      executeJoinWithIterators(

Review comment:
       Just FYI - the original code in `doExecute()` is moved into a new public method `executeJoinWithIterators()` without any actual change. Not sure why github shows so many code change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org