You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zecevicp <gi...@git.apache.org> on 2018/04/19 21:16:21 UTC

[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

GitHub user zecevicp opened a pull request:

    https://github.com/apache/spark/pull/21109

    [SPARK-24020][SQL] Sort-merge join inner range optimization

    ## What changes were proposed in this pull request?
    
    The main changes are to ExtractEquiJoinKeys, SortMergeJoinExec and the new InMemoryUnsafeRowQueue class.
    Changes to equi-join key extraction (ExtractEquiJoinKeys class) to recognize the situation where the join condition contains an equi-join and a range condition on a second column (e.g. t1.x = t2.x AND t1.y BETWEEN t2.y - d1 AND t2.y + d2).
    Changes to SortMergeJoinExec with implementation of the inner range optimization. Range condition (upper and lower limits) is used to iterate through the rows of the right relation using a moving window, which moves as the values of the current left row change. 
    The moving window is implemented as the InMemoryUnsafeRowQueue class (new class added as part of the PR). 
    
    ## How was this patch tested?
    Unit tests (InnerRangeSuite.scala)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dirac-institute/spark branch-pz-smj

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21109
    
----
commit de2f3e27eb94e4a3d7d27e910e7892eb57d2240d
Author: Petar Zecevic <pz...@...>
Date:   2018-03-22T09:25:19Z

    Secondary sort Sort Merge Join optimization. Not finished yet.

commit 44c58fbaab61852b12ef1e2a91a192b32259d9ab
Author: Petar Zecevic <pz...@...>
Date:   2018-03-22T09:56:33Z

    SortMergeJoin secondary sort optimization

commit 4ba041e9746e331f139c8b263b6e4ab2e8e3a4d8
Author: Petar Zecevic <pz...@...>
Date:   2018-04-04T18:17:47Z

    Sort-Merge "inner range join" (secondary sort) - code generation

commit 1a08c438a1082de860f472582fef12eecd6d1e02
Author: Petar Zecevic <pz...@...>
Date:   2018-04-05T18:24:59Z

    Sort-Merge "inner range join" (secondary sort) - two bug fixes - works now

commit 7580d30ce3900ec8e905b43c449911424e21458f
Author: Petar Zecevic <pz...@...>
Date:   2018-04-09T13:20:15Z

    Code simplification

commit 6dd439a8a364429cefee36839450ae67e37a616b
Author: Petar Zecevic <pz...@...>
Date:   2018-04-10T09:56:21Z

    Bug fix

commit 74f26c475087f32b0f99d15c4b613a5de92948ad
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T06:45:11Z

    Scalastyle fixes

commit 395b2bdcd68c7159b58d7ce5de6abae7159f3714
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T06:45:43Z

    Scalastyle fixes

commit 088fe6518c7dfa58b1f15be17935d570e84afe5d
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T06:46:28Z

    SMJ range join unit tests

commit e83771617b6d5f155f3c2ad5e8119a534536cb8f
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T15:32:13Z

    Scalastyle

commit fd309939e193459cbf4aee683b570df1e4d9a87a
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T18:32:16Z

    Scalastyle

commit 78e2805f279e4ffd471a7365be2f47d379a7d933
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T18:36:12Z

    Scalastyle

commit c17514933b9e4c62f4b61ad2145520984f6f9ddd
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T18:47:55Z

    Fix generated code - dequeue method missing

commit 68a11bbbea0c880096affddef587c860f347bf4f
Author: Petar Zecevic <pz...@...>
Date:   2018-04-13T18:53:24Z

    Bug fix: include other binary comparisons in range conditions match

commit e6cd129ebd0efdb415cea6bcedb05f55a29d0f82
Author: Petar Zecevic <pz...@...>
Date:   2018-04-16T16:31:19Z

    Test fix: sortWithinPartitions; Bug Fix: check references in rangeConditions, not columns

commit 142e77fc7019ffc8cc34d8891ee08d05de7e0e34
Author: Petar Zecevic <pz...@...>
Date:   2018-04-16T23:58:44Z

    Test fix

commit 7a26afee0ebc29e28dc8156f716eee39422fd07d
Author: Petar Zecevic <pz...@...>
Date:   2018-04-17T00:19:49Z

    Test fix

commit 0c3595cc4f443057c18b9757253364d61c65bf27
Author: Petar Zecevic <pz...@...>
Date:   2018-04-18T22:26:48Z

    Fix required child ordering for inner range queries

commit c6de4476f7be80e9513ab962d30675bd7be70177
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T16:45:07Z

    Parameter for turning off inner range optimization

commit aa995f797ac61dc8de7683b65e0c1c104682f372
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T18:21:41Z

    Scala style

commit 79cdc479f309ca6b7dc74ddbcd51ea43bda0bfbd
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T19:20:32Z

    Bug fix - NPE when inner range optimization turned off

commit 41ae8de3ad5351f74dfb3c74cd5ef7c382f68e22
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T19:21:01Z

    Adding test case when inner range optimization is turned off

commit 326aefec4f10c0dc94b70d74fb27681303253944
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T19:24:49Z

    Stala style

commit dd04487a11aad75cb728750c650a96819f86b9d7
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T19:27:28Z

    Stala style

commit 4457e85a0ea39b5a3d7b0258b7c212d98f455e76
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T19:40:31Z

    Remove range condition extraction when inner range join optimization is disabled

commit 6e09bf722c24d8e375f1bd85df0901d5344e3aec
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T19:42:11Z

    Scala style

commit 48be06c79c9ad80f505e9ab35a3d76521039693c
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T20:44:58Z

    Unit test fix

commit 4c6a726bac71796d7210cfde9cf762dfdc38d165
Author: Petar Zecevic <pz...@...>
Date:   2018-04-19T20:55:50Z

    Unit test fix

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188265911
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -434,18 +511,139 @@ case class SortMergeJoinExec(
         // Copy the right key as class members so they could be used in next function call.
         val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
     
    -    // A list to hold all matched rows from right side.
    -    val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
    +    val rangeKeys = rangeConditions.map{
    +      case GreaterThan(l, r) => (Some(l), None, Some(r), None)
    +      case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None)
    +      case LessThan(l, r) => (None, Some(l), None, Some(r))
    +      case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r))
    +    }
    +    val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) =
    +      (rangeKeys.map(_._1).flatMap(x => x),
    +        rangeKeys.map(_._2).flatMap(x => x),
    +        rangeKeys.map(_._3).flatMap(x => x),
    +        rangeKeys.map(_._4).flatMap(x => x))
    +
    +    // Variables for secondary range expressions
    +    val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, rightUpperKeyVars) =
    +      if (useInnerRange) {
    +        (createJoinKey(ctx, leftRow, leftLowerKeys, left.output),
    +          createJoinKey(ctx, leftRow, leftUpperKeys, left.output),
    +          createJoinKey(ctx, rightRow, rightLowerKeys, right.output),
    +          createJoinKey(ctx, rightRow, rightUpperKeys, right.output))
    +      }
    +      else {
    +        (Nil, Nil, Nil, Nil)
    +      }
    +
    +    val secRangeDataType = if (leftLowerKeys.size > 0) { leftLowerKeys(0).dataType }
    +      else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType }
    +      else null
    +    val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType)
    +
    +    val (leftLowerSecRangeKey, leftUpperSecRangeKey, rightLowerSecRangeKey, rightUpperSecRangeKey) =
    +      if (useInnerRange) {
    +        (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", secRangeInitValue))
    +      }
    +      else {
    +        (null, null, null, null)
    +      }
    +
    +    // A queue to hold all matched rows from right side.
    +    val clsName = if (useInnerRange) classOf[InMemoryUnsafeRowQueue].getName
    +      else classOf[ExternalAppendOnlyUnsafeRowArray].getName
     
         val spillThreshold = getSpillThreshold
         val inMemoryThreshold = getInMemoryThreshold
     
    -    // Inline mutable state since not many join operations in a task
         val matches = ctx.addMutableState(clsName, "matches",
           v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
    -    // Copy the left keys as class members so they could be used in next function call.
         val matchedKeyVars = copyKeys(ctx, leftKeyVars)
     
    +    val lowerCompop = lowerSecondaryRangeExpression.map {
    +      case GreaterThanOrEqual(_, _) => "<"
    +      case GreaterThan(_, _) => "<="
    +      case _ => ""
    +    }.getOrElse("")
    +    val upperCompop = upperSecondaryRangeExpression.map {
    +      case LessThanOrEqual(_, _) => ">"
    +      case LessThan(_, _) => ">="
    +      case _ => ""
    +    }.getOrElse("")
    +    val lowerCompExp = if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " +
    +        s"$lowerCompop ${rightLowerSecRangeKey.value})"
    +    val upperCompExp = if (!useInnerRange || upperSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " +
    +        s"$upperCompop ${rightUpperSecRangeKey.value})"
    +
    +    logDebug(s"lowerCompExp: $lowerCompExp")
    +    logDebug(s"upperCompExp: $upperCompExp")
    +
    +    // Add secondary range dequeue method
    +    if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty ||
    +        rightLowerKeys.size == 0 || rightUpperKeys.size == 0) {
    +      ctx.addNewFunction("dequeueUntilUpperConditionHolds",
    +        "private void dequeueUntilUpperConditionHolds() { }",
    +        inlineToOuterClass = true)
    --- End diff --
    
    Can you elaborate please? I'm not sure what you mean.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92436/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188258797
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    +                                    left : LogicalPlan, right : LogicalPlan,
    +                                    joinKeys : Seq[(Expression, Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      "none"
    +    }
    +    else if (canEvaluate(l, left) && canEvaluate(r, right)) {
    +      val equiset = joinKeys.filter{ case (ljk : Expression, rjk : Expression) =>
    +        ljk.references.toSeq.contains(lattrs(0)) && rjk.references.toSeq.contains(rattrs(0)) }
    +      if (equiset.isEmpty) {
    +        "asis"
    --- End diff --
    
    OK, makes sense.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91760 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91760/testReport)** for PR 21109 at commit [`5c64b55`](https://github.com/apache/spark/commit/5c64b55766630ffec33ac3c82ce10703dc3d526c).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92472 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92472/testReport)** for PR 21109 at commit [`39247ba`](https://github.com/apache/spark/commit/39247bac0de645aa959cc7fd11a27e36532181a5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188246236
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -434,18 +511,139 @@ case class SortMergeJoinExec(
         // Copy the right key as class members so they could be used in next function call.
         val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
     
    -    // A list to hold all matched rows from right side.
    -    val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
    +    val rangeKeys = rangeConditions.map{
    +      case GreaterThan(l, r) => (Some(l), None, Some(r), None)
    +      case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None)
    +      case LessThan(l, r) => (None, Some(l), None, Some(r))
    +      case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r))
    +    }
    +    val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) =
    +      (rangeKeys.map(_._1).flatMap(x => x),
    +        rangeKeys.map(_._2).flatMap(x => x),
    +        rangeKeys.map(_._3).flatMap(x => x),
    +        rangeKeys.map(_._4).flatMap(x => x))
    +
    +    // Variables for secondary range expressions
    +    val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, rightUpperKeyVars) =
    +      if (useInnerRange) {
    +        (createJoinKey(ctx, leftRow, leftLowerKeys, left.output),
    +          createJoinKey(ctx, leftRow, leftUpperKeys, left.output),
    +          createJoinKey(ctx, rightRow, rightLowerKeys, right.output),
    +          createJoinKey(ctx, rightRow, rightUpperKeys, right.output))
    +      }
    +      else {
    +        (Nil, Nil, Nil, Nil)
    +      }
    +
    +    val secRangeDataType = if (leftLowerKeys.size > 0) { leftLowerKeys(0).dataType }
    +      else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType }
    +      else null
    +    val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType)
    +
    +    val (leftLowerSecRangeKey, leftUpperSecRangeKey, rightLowerSecRangeKey, rightUpperSecRangeKey) =
    +      if (useInnerRange) {
    +        (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", secRangeInitValue))
    +      }
    +      else {
    +        (null, null, null, null)
    +      }
    +
    +    // A queue to hold all matched rows from right side.
    +    val clsName = if (useInnerRange) classOf[InMemoryUnsafeRowQueue].getName
    +      else classOf[ExternalAppendOnlyUnsafeRowArray].getName
     
         val spillThreshold = getSpillThreshold
         val inMemoryThreshold = getInMemoryThreshold
     
    -    // Inline mutable state since not many join operations in a task
         val matches = ctx.addMutableState(clsName, "matches",
           v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
    -    // Copy the left keys as class members so they could be used in next function call.
         val matchedKeyVars = copyKeys(ctx, leftKeyVars)
     
    +    val lowerCompop = lowerSecondaryRangeExpression.map {
    +      case GreaterThanOrEqual(_, _) => "<"
    +      case GreaterThan(_, _) => "<="
    +      case _ => ""
    +    }.getOrElse("")
    +    val upperCompop = upperSecondaryRangeExpression.map {
    +      case LessThanOrEqual(_, _) => ">"
    +      case LessThan(_, _) => ">="
    +      case _ => ""
    +    }.getOrElse("")
    +    val lowerCompExp = if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " +
    +        s"$lowerCompop ${rightLowerSecRangeKey.value})"
    +    val upperCompExp = if (!useInnerRange || upperSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " +
    +        s"$upperCompop ${rightUpperSecRangeKey.value})"
    +
    +    logDebug(s"lowerCompExp: $lowerCompExp")
    +    logDebug(s"upperCompExp: $upperCompExp")
    +
    +    // Add secondary range dequeue method
    +    if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty ||
    +        rightLowerKeys.size == 0 || rightUpperKeys.size == 0) {
    +      ctx.addNewFunction("dequeueUntilUpperConditionHolds",
    +        "private void dequeueUntilUpperConditionHolds() { }",
    +        inlineToOuterClass = true)
    --- End diff --
    
    do we really need this? can't we just use the returned name?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193736438
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala ---
    @@ -70,27 +70,41 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
         (3, 2)
       ).toDF("a", "b")
     
    +  private lazy val rangeTestData1 = Seq(
    +    (1, 3), (1, 4), (1, 7), (1, 8), (1, 10),
    +    (2, 1), (2, 2), (2, 3), (2, 8),
    +    (3, 1), (3, 2), (3, 3), (3, 5),
    +    (4, 1), (4, 2), (4, 3)
    +  ).toDF("a", "b")
    +
    +  private lazy val rangeTestData2 = Seq(
    +    (1, 1), (1, 2), (1, 2), (1, 3), (1, 5), (1, 7), (1, 20),
    +    (2, 1), (2, 2), (2, 3), (2, 5), (2, 6),
    +    (3, 3), (3, 6)
    +  ).toDF("a", "b")
    +
       // Note: the input dataframes and expression must be evaluated lazily because
       // the SQLContext should be used only within a test to keep SQL tests stable
       private def testInnerJoin(
    -      testName: String,
    -      leftRows: => DataFrame,
    -      rightRows: => DataFrame,
    -      condition: () => Expression,
    -      expectedAnswer: Seq[Product]): Unit = {
    +                             testName: String,
    +                             leftRows: => DataFrame,
    +                             rightRows: => DataFrame,
    +                             condition: () => Expression,
    +                             expectedAnswer: Seq[Product],
    +                             expectRangeJoin: Boolean = false): Unit = {
     
         def extractJoinParts(): Option[ExtractEquiJoinKeys.ReturnType] = {
           val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition()))
           ExtractEquiJoinKeys.unapply(join)
         }
     
         def makeBroadcastHashJoin(
    -        leftKeys: Seq[Expression],
    -        rightKeys: Seq[Expression],
    -        boundCondition: Option[Expression],
    -        leftPlan: SparkPlan,
    -        rightPlan: SparkPlan,
    -        side: BuildSide) = {
    +                               leftKeys: Seq[Expression],
    --- End diff --
    
    (Undo this whitespace change and the next one)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193734550
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns) of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        // rangePreds will contain the original expressions to be filtered out later.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); GreaterThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); LessThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); LessThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case _ => None
    +            }
    +          } else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    --- End diff --
    
    Instead of checking `.size == 0`, something like `rangeConditions.forall(... not instance of either ...)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92467/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193762830
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import java.util.ConcurrentModificationException
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.{SparkEnv, TaskContext}
    +import org.apache.spark.memory.TaskMemoryManager
    +import org.apache.spark.serializer.SerializerManager
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
    +import org.apache.spark.storage.BlockManager
    +
    +/**
    + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
    + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
    + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
    + * excessive memory consumption). Setting these threshold involves following trade-offs:
    + *
    + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
    + *   than is available, resulting in OOM.
    + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
    + *   excessive disk writes. This may lead to a performance regression compared to the normal case
    + *   of using an [[ArrayBuffer]] or [[Array]].
    + */
    +private[sql] class InMemoryUnsafeRowQueue(
    +    taskMemoryManager: TaskMemoryManager,
    +    blockManager: BlockManager,
    +    serializerManager: SerializerManager,
    +    taskContext: TaskContext,
    +    initialSize: Int,
    +    pageSizeBytes: Long,
    +    numRowsInMemoryBufferThreshold: Int,
    +    numRowsSpillThreshold: Int)
    +  extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager,
    +      blockManager,
    +      serializerManager,
    +      taskContext,
    +      initialSize,
    +      pageSizeBytes,
    +      numRowsInMemoryBufferThreshold,
    +      numRowsSpillThreshold) {
    +
    +  def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) {
    +    this(
    +      TaskContext.get().taskMemoryManager(),
    +      SparkEnv.get.blockManager,
    +      SparkEnv.get.serializerManager,
    +      TaskContext.get(),
    +      1024,
    +      SparkEnv.get.memoryManager.pageSizeBytes,
    +      numRowsInMemoryBufferThreshold,
    +      numRowsSpillThreshold)
    +  }
    +
    +  private val initialSizeOfInMemoryBuffer =
    +    Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)
    +
    +  private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) {
    +    new mutable.Queue[UnsafeRow]()
    +  } else {
    +    null
    +  }
    +
    +//  private var spillableArray: UnsafeExternalSorter = _
    --- End diff --
    
    nit: Is this comment necessary?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91912/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92467 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92467/testReport)** for PR 21109 at commit [`7f7ab25`](https://github.com/apache/spark/commit/7f7ab257bc14f5b529de6c22b45f00bc2724ab20).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90477/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89961 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89961/testReport)** for PR 21109 at commit [`e6e6628`](https://github.com/apache/spark/commit/e6e6628bf3d63e0486c2ba90c03712aa0eade013).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188251314
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    We need to separate range conditions that are relevant for the optimizations from other join conditions. `rangePreds` is used later to remove range predicates from "other predicates" (if any). 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91533/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92247: Deflake Build #92089 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92247/testReport)** for PR 21109 at commit [`9889ba1`](https://github.com/apache/spark/commit/9889ba1ddbf0bfcb4d48b890634b6389ac4bd535).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    I added benchmark code in `JoinBenchmark`. The tests show 8x improvement over non-optimized code. Although, it should be noted that the results depend on the exact range conditions and the calculations performed on each matched row. 
    In our case, we were not able to cross-match two rather large datasets (1.2 billion rows x 800 million rows) without this optimization. With the optimization, the cross-match finishes in less than 2 minutes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #94631 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94631/testReport)** for PR 21109 at commit [`af59b8a`](https://github.com/apache/spark/commit/af59b8a285ffe968d87cdf1bf9758b142e0bcfd4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90642/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92435 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92435/testReport)** for PR 21109 at commit [`48c3929`](https://github.com/apache/spark/commit/48c392906488167141355ac4e0ef41c449357280).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92467 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92467/testReport)** for PR 21109 at commit [`7f7ab25`](https://github.com/apache/spark/commit/7f7ab257bc14f5b529de6c22b45f00bc2724ab20).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    do we have a design doc? I have a couple of high-level questions:
    1. what if the range is big and the queue OOM?
    2. can't we apply it with cartesian join if there is no equal condition? and what about broadcast join?
    3. if the equal join key is skewed, then we kind of doing this sort-merge-range-join in a single thread, even if the range join key is not skewed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r184837526
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala ---
    @@ -222,6 +222,61 @@ class JoinBenchmark extends BenchmarkBase {
          */
       }
     
    +  val expensiveFunc = (first: Int, second: Int) => {
    +    for (i <- 1 to 2000) {
    +      Math.sqrt(i * i * i)
    +    }
    +    Math.abs(first - second)
    +  }
    +
    +  def innerRangeTest(N: Int, M: Int): Unit = {
    +    import sparkSession.implicits._
    +    val expUdf = sparkSession.udf.register("expensiveFunc", expensiveFunc)
    +    val df1 = sparkSession.sparkContext.parallelize(1 to M).
    +      cartesian(sparkSession.sparkContext.parallelize(1 to N)).
    +      toDF("col1a", "col1b")
    +    val df2 = sparkSession.sparkContext.parallelize(1 to M).
    +      cartesian(sparkSession.sparkContext.parallelize(1 to N)).
    +      toDF("col2a", "col2b")
    +    val df = df1.join(df2, 'col1a === 'col2a and ('col1b < 'col2b + 3) and ('col1b > 'col2b - 3))
    +    assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
    +    df.where(expUdf('col1b, 'col2b) < 3).count()
    +  }
    +
    +  ignore("sort merge inner range join") {
    +    sparkSession.conf.set("spark.sql.join.smj.useInnerRangeOptimization", "false")
    +    val N = 2 << 5
    +    val M = 100
    +    runBenchmark("sort merge inner range join", N * M) {
    +      innerRangeTest(N, M)
    +    }
    +
    +    /*
    +     *AMD EPYC 7401 24-Core Processor
    +     *sort merge join:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    +     *---------------------------------------------------------------------------------------------
    +     *sort merge join wholestage off            13822 / 14068          0.0     2159662.3       1.0X
    +     *sort merge join wholestage on               3863 / 4226          0.0      603547.0       3.6X
    +     */
    +  }
    +
    +  ignore("sort merge inner range join optimized") {
    +    sparkSession.conf.set("spark.sql.join.smj.useInnerRangeOptimization", "true")
    +    val N = 2 << 5
    +    val M = 100
    +    runBenchmark("sort merge inner range join optimized", N * M) {
    +      innerRangeTest(N, M)
    +    }
    +
    +    /*
    +     *AMD EPYC 7401 24-Core Processor
    +     *sort merge join:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    +     *---------------------------------------------------------------------------------------------
    +     *sort merge join wholestage off            12723 / 12800          0.0     1988008.4       1.0X
    --- End diff --
    
    Why wholestage-off case doesn't get much improvement as wholestage-on case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188240371
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    --- End diff --
    
    remove the comment please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188230817
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -97,13 +100,13 @@ object PhysicalOperation extends PredicateHelper {
      * value).
      */
     object ExtractEquiJoinKeys extends Logging with PredicateHelper {
    -  /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
    +  /** (joinType, leftKeys, rightKeys, rangeConditions, condition, leftChild, rightChild) */
       type ReturnType =
    -    (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
    +    (JoinType, Seq[Expression], Seq[Expression], Seq[BinaryComparison],
    +      Option[Expression], LogicalPlan, LogicalPlan)
     
       def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
         case join @ Join(left, right, joinType, condition) =>
    -      logDebug(s"Considering join on: $condition")
    --- End diff --
    
    why removing this debug?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193735968
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns) of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        // rangePreds will contain the original expressions to be filtered out later.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); GreaterThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); LessThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); LessThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case _ => None
    +            }
    +          } else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  /**
    +   * Checks if l and r are valid range conditions:
    +   *   - l and r expressions should both contain a single reference to one and the same column.
    +   *   - the referenced column should not be part of joinKeys
    +   * If these conditions are not met, the function returns None.
    +   *
    +   * Otherwise, the function checks if the left plan contains l expression and the right plan
    +   * contains r expression. If the expressions need to be switched, the function returns Some(true)
    +   * and Some(false) otherwise.
    +   */
    +  private def checkRangeConditions(l : Expression, r : Expression,
    +      left : LogicalPlan, right : LogicalPlan,
    +      joinKeys : Seq[(Expression, Expression)]) = {
    --- End diff --
    
    For clarity add a return type to this method. `Option[Boolean]`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @maropu Regarding the first point, whether this belongs to SMJ or not, take a look at this [paper](https://ieeexplore.ieee.org/document/5447873/), page 11. They describe several special cases of SMJ, one of them being "epsilon-join", which is exactly what is implemented here.
    
    Regarding adding dummy join keys (some kind of binning) to do spatial or temporal joining, that wouldn't help in our case and I believe in many other cases because you would need to bin the data by the second column in a fixed manner. And there you would have the problems of how to join data beyond the borders of those bins (which would probably require additional data duplication). These bins would probably be calculated on the fly, so additional computing is required, and most probably an additional sorting step would be needed (additional pass through the data).
    
    Regarding making it a simpler change, although the number of changed lines is somewhat large, the change is well-confined to a specific code path and the minimum of existing code is changed. Most of the changes are new additions (new code for code generation; the `InMemoryUnsafeRowQueue` class, which is used only from inside of the new code; `SortMergeJoinInnerRangeScanner` class, which should be used when whole-stage codegen is turned off, but has a difficult-to-debug bug now and is turned off). 
    
    The only thing that could be removed currently is the `SortMergeJoinInnerRangeScanner` class, which is 230 lines long. It could be moved to a separate JIRA PR (SMJ inner-range optimization with whole-stage codegen turned off). I tried to find the bug there but I failed, so separating this makes sense. I'll give it a few more tries, and if I fail again, I will move this part to a different JIRA PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @mgaido91 Regarding the amount of code, maybe you can suggest how to reduce it? Because I don't see a way...
    I think the code is well contained (mostly in separate new classes) and is not contaminating the existing codebase. There is a simple switch (parameter) that turns off the whole thing.
    Some code also went into the use cases which contribute to the amount of changes.
    I believe that the potentially significant speedup justifies the line count.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #90475 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90475/testReport)** for PR 21109 at commit [`535d0d6`](https://github.com/apache/spark/commit/535d0d63b33ef320fded52c18b2716a1333255ce).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by shaneknapp <gi...@git.apache.org>.
Github user shaneknapp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    test this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91900 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91900/testReport)** for PR 21109 at commit [`6d72fe0`](https://github.com/apache/spark/commit/6d72fe0466f210e001a67829e0f42379abc7e4f0).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    I managed to fix the code path that is executed when the wholestage codegen is turned off. Now both code paths give the same results and have the optimization implemented. I also changed the tests in the `InnerJoinSuite` class so that they are run with both wholestage turned on off and on (which wasn't the case so far).
    I updated the benchmark results in `JoinBenchmark`. The results are now the following.
    Without inner range optimization:
    ```
    sort merge join:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------
    sort merge join wholestage off            25226 / 25244          0.0       61585.9       1.0X
    sort merge join wholestage on              8581 / 8983          0.0       20948.6       2.9X
    ```
    With inner range optimization:
    ```
    sort merge join:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------
    sort merge join wholestage off              1194 / 1212          0.3        2915.2       1.0X
    sort merge join wholestage on                814 /  867          0.5        1988.4       1.5X
    ```
    So, there is 10x improvement for wholestage ON case and 21x improvement for wholestage OFF case.
    
    I believe this is now ready to be merged, which would greatly help us in our projects.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91917/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92435 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92435/testReport)** for PR 21109 at commit [`48c3929`](https://github.com/apache/spark/commit/48c392906488167141355ac4e0ef41c449357280).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91785 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91785/testReport)** for PR 21109 at commit [`fb99390`](https://github.com/apache/spark/commit/fb99390a4baf62ce953be5a78c552adae98c1ffa).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92038 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92038/testReport)** for PR 21109 at commit [`66d7cbf`](https://github.com/apache/spark/commit/66d7cbf76ec0170bd8e78d4b936e6c7650998f34).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    I think range join is a very important feature, but the current implementation is not robust enough(no spilling to disk). Have you explored other simpler solutions? I vaguely remember that some papers describe a way to rewrite range join to equal-join during the logical phase.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89961 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89961/testReport)** for PR 21109 at commit [`e6e6628`](https://github.com/apache/spark/commit/e6e6628bf3d63e0486c2ba90c03712aa0eade013).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89934/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90475/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94630/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    That is a restriction if you regard this as an implementation of a range join, which it is not. This is an *optimization of an equi join*. The intention never was for it to work as a range join. 
    I can investigate the logical rewrite approach, but as a part of a different JIRA ticket, one which would actually be related to a range join.
    Thanks for looking into this, though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    The code path with the optimization but without wholegen code generation gives wrong results. And I haven't been able to figure out where is the bug. I spent several hours at this again today. I've been using this code as an example:
    ```
    spark.conf.set("spark.sql.codegen.wholeStage", value = false)
    spark.conf.set("spark.sql.shuffle.partitions", "1")
    val df1 = sc.parallelize(1 to 2).cartesian(sc.parallelize(1 to 10)).toDF("col1a", "col1b")
    val df2 = sc.parallelize(1 to 2).cartesian(sc.parallelize(1 to 10)).toDF("col2a", "col2b")
    val res = df1.join(df2, 'col1a === 'col2a and ('col1b < 'col2b + 3) and ('col1b > 'col2b - 3))
    res.count
    ```
    With wholeStage off this gives 65 as a result and 88 if wholeStage is on (with the optimization turned on). Without the optimization it is always 88, which is correct.
    So, if someone can figure out where the bug is, great. If not, maybe it's best to just turn the optimization off for the non-wholegen case (for now).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193763364
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import java.util.ConcurrentModificationException
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.{SparkEnv, TaskContext}
    +import org.apache.spark.memory.TaskMemoryManager
    +import org.apache.spark.serializer.SerializerManager
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
    +import org.apache.spark.storage.BlockManager
    +
    +/**
    + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
    + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
    + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
    + * excessive memory consumption). Setting these threshold involves following trade-offs:
    + *
    + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
    + *   than is available, resulting in OOM.
    + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
    + *   excessive disk writes. This may lead to a performance regression compared to the normal case
    + *   of using an [[ArrayBuffer]] or [[Array]].
    + */
    +private[sql] class InMemoryUnsafeRowQueue(
    +    taskMemoryManager: TaskMemoryManager,
    +    blockManager: BlockManager,
    +    serializerManager: SerializerManager,
    +    taskContext: TaskContext,
    +    initialSize: Int,
    +    pageSizeBytes: Long,
    +    numRowsInMemoryBufferThreshold: Int,
    +    numRowsSpillThreshold: Int)
    +  extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager,
    +      blockManager,
    +      serializerManager,
    +      taskContext,
    +      initialSize,
    +      pageSizeBytes,
    +      numRowsInMemoryBufferThreshold,
    +      numRowsSpillThreshold) {
    +
    +  def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) {
    +    this(
    +      TaskContext.get().taskMemoryManager(),
    +      SparkEnv.get.blockManager,
    +      SparkEnv.get.serializerManager,
    +      TaskContext.get(),
    +      1024,
    +      SparkEnv.get.memoryManager.pageSizeBytes,
    +      numRowsInMemoryBufferThreshold,
    +      numRowsSpillThreshold)
    +  }
    +
    +  private val initialSizeOfInMemoryBuffer =
    +    Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)
    +
    +  private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) {
    +    new mutable.Queue[UnsafeRow]()
    +  } else {
    +    null
    +  }
    +
    +//  private var spillableArray: UnsafeExternalSorter = _
    --- End diff --
    
    No, it's not. Thank you


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92436 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92436/testReport)** for PR 21109 at commit [`497aa52`](https://github.com/apache/spark/commit/497aa52c9a0e15b1fdf0674c9b9510deef9e5dc1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    I implemented spill over to disk, I added tests for spilling over and I tested it on live data. Everything works well (actually it was easier than I thought at first). I also removed the `InMemoryUnsafeRowQueue` class and implemented moving window changes in the `ExternalAppendOnlyUnsafeRowArray` class. It is no longer "append only" but I didn't want to change the name too.
    
    Unit tests are passing and it merges cleanly so I hope this can now be merged.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #99907 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99907/testReport)** for PR 21109 at commit [`07ff4d3`](https://github.com/apache/spark/commit/07ff4d3a69967e13438dd8bd3e4130bf23b65c7d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91900/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #99907 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99907/testReport)** for PR 21109 at commit [`07ff4d3`](https://github.com/apache/spark/commit/07ff4d3a69967e13438dd8bd3e4130bf23b65c7d).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92435/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92436 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92436/testReport)** for PR 21109 at commit [`497aa52`](https://github.com/apache/spark/commit/497aa52c9a0e15b1fdf0674c9b9510deef9e5dc1).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188241998
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    +                                    left : LogicalPlan, right : LogicalPlan,
    +                                    joinKeys : Seq[(Expression, Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      "none"
    --- End diff --
    
    why not returning an `Option` and make this a `None`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188242730
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    +                                    left : LogicalPlan, right : LogicalPlan,
    +                                    joinKeys : Seq[(Expression, Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      "none"
    +    }
    +    else if (canEvaluate(l, left) && canEvaluate(r, right)) {
    +      val equiset = joinKeys.filter{ case (ljk : Expression, rjk : Expression) =>
    +        ljk.references.toSeq.contains(lattrs(0)) && rjk.references.toSeq.contains(rattrs(0)) }
    +      if (equiset.isEmpty) {
    +        "asis"
    --- End diff --
    
    can we return something more meaningful than this string? Maybe a `Option(Bool)` in enough which tells whether to reverse or not.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91533 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91533/testReport)** for PR 21109 at commit [`9a0b2ab`](https://github.com/apache/spark/commit/9a0b2abf482a51c1e9901ce5297c9ca1b7961765).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92472/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91785 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91785/testReport)** for PR 21109 at commit [`fb99390`](https://github.com/apache/spark/commit/fb99390a4baf62ce953be5a78c552adae98c1ffa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Since I feel this is a limited case, I'm not certainly sure this optimization needs to be handled in smj. For spatial or temporal use cases, is it not enough to add dummy join keys to split tasks into pieces for workaround?
    ```
    'col1a === 'col2a and 'col1dummyKey === 'col2dummyKey and ('col1b < 'col2b + 3) and ('col1b > 'col2b - 3)
    ```
    Btw, can you fix this issue by more simpler code change?  (I'm not sure this big change pays the performance gain...)



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91913/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #90642 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90642/testReport)** for PR 21109 at commit [`48b1c0e`](https://github.com/apache/spark/commit/48b1c0e73d3db1c406b719a2343a9c395627bea2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Implementing spilling over seems a lot of work because this is a queue. If data is spilled over to disk and you need to pop from the queue, it is not clear to me what is the best way to do that. Do you spill over only one part of the queue (so that you can add or pop more efficiently)? Which part (the beginning or the end)? Or maybe the middle? What is the threshold to bring it back to memory from disk? And other similar questions...
    But I think it can be expected that much less memory will be consumed by the queue, compared to the original `ExternalAppendOnlyUnsafeRowArray`, because the queue's purpose IS to reduce the number of rows in memory, so spill-over would rarely be needed (that would depend, of course, to the user's range condition). 
    That's why implementing spilling over doesn't seem critical to me. I can try and implement it, if everybody thinks it's really needed, but as I said, it's not clear (to me) what would be the best approach.
    
    Regarding the second point, this is not an ordinary range join, but an equi-join with a secondary range condition.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188243359
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    +                                    left : LogicalPlan, right : LogicalPlan,
    +                                    joinKeys : Seq[(Expression, Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      "none"
    +    }
    +    else if (canEvaluate(l, left) && canEvaluate(r, right)) {
    +      val equiset = joinKeys.filter{ case (ljk : Expression, rjk : Expression) =>
    --- End diff --
    
    what about using exists?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @viirya I think predicate reordering based on cost estimation  and others is an interesting topic in optimizer. But, IMO the topic is not directly related to this pr (I'm not 100% sure though), it'd be better to file another jira to keep the discussion (and the benchmark result you reported)? I couldn't find a jira entry for the topic.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92441 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92441/testReport)** for PR 21109 at commit [`a2a5f82`](https://github.com/apache/spark/commit/a2a5f82c377402348a82e5db5587504d39f5a894).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193737191
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import java.util.ConcurrentModificationException
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.{SparkEnv, TaskContext}
    +import org.apache.spark.memory.TaskMemoryManager
    +import org.apache.spark.serializer.SerializerManager
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
    +import org.apache.spark.storage.BlockManager
    +
    +/**
    + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
    + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
    + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
    + * excessive memory consumption). Setting these threshold involves following trade-offs:
    + *
    + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
    + *   than is available, resulting in OOM.
    + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
    + *   excessive disk writes. This may lead to a performance regression compared to the normal case
    + *   of using an [[ArrayBuffer]] or [[Array]].
    + */
    +private[sql] class InMemoryUnsafeRowQueue(
    --- End diff --
    
    No way to avoid making a custom queue implementation here? is it messier without such a structure?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #95807 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95807/testReport)** for PR 21109 at commit [`0a5c8de`](https://github.com/apache/spark/commit/0a5c8de7769315934712bf853401c332dd747a6e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89592 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89592/testReport)** for PR 21109 at commit [`4c6a726`](https://github.com/apache/spark/commit/4c6a726bac71796d7210cfde9cf762dfdc38d165).
     * This patch **fails from timeout after a configured wait of \`300m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Hey Liang-Chi, thanks for looking into this.
    Yes, the problem can be circumvented by changing the join condition as you describe, but only in the benchmark case, because my "expensive function" was a bit misleading. 
    The problem is not in the function itself, but in the number of rows that are checked for each pair of matching equi-join keys. 
    I changed the benchmark test case now so to better demonstrate this. I completely removed the expensive function and I'm only doing a count on the matched rows. The results are the following.
    Without the optimization:
    ```
    AMD EPYC 7401 24-Core Processor
    sort merge join:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------
    sort merge join wholestage off            30956 / 31374          0.0       75575.5       1.0X
    sort merge join wholestage on             10864 / 11043          0.0       26523.6       2.8X
    ```
    With the optimization:
    ```
    AMD EPYC 7401 24-Core Processor
    sort merge join:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------
    sort merge join wholestage off            30734 / 31135          0.0       75035.2       1.0X
    sort merge join wholestage on                959 / 1040          0.4        2341.3      32.0X
    ```
    This shows a 10x improvement over the non-optimized case (as I already said, this depends on the range condition, number of matched rows, the calculated function, etc.).
    
    Regarding your second question as to why is the "wholestage off" case in the optimized version so slow, that is because the optimization is turned off when the wholestage code generation is turned off.
    And that is simply because it was too hard to debug it and I figured the wholestage generation is on by default, so I'm guessing (and hoping) that it would not be too hard of a requirement to have to turn wholestage codegen on if you want to use this optimization.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92023 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92023/testReport)** for PR 21109 at commit [`66d7cbf`](https://github.com/apache/spark/commit/66d7cbf76ec0170bd8e78d4b936e6c7650998f34).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193733146
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns) of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        // rangePreds will contain the original expressions to be filtered out later.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); GreaterThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); LessThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); LessThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case _ => None
    +            }
    +          } else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    --- End diff --
    
    Nit: space after "if" here and elsewhere


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91760 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91760/testReport)** for PR 21109 at commit [`5c64b55`](https://github.com/apache/spark/commit/5c64b55766630ffec33ac3c82ce10703dc3d526c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92441/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Sorry, I don't quite understand your question. This already applies (only) to equijoins if there are additional range conditions on secondary columns. So if Spark rewrites those range conditions (?), and you end up with two equijoins (doesn't sound like a realistic scenario), then this doesn't apply at all.
    But if you have an equi-join which cannot be performed because you need to match a huge number of rows, and you can narrow down the search window using range conditions, then the advantage is that this makes it feasible and/or much, much faster.
    
    Regarding the second point, I don't want to tell you what to do, but on my part I can say that this has been tested with unit tests and on real, large datasets and I believe it should be safe to merge. But it can also wait for 2.5/3.0...



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188245774
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -434,18 +511,139 @@ case class SortMergeJoinExec(
         // Copy the right key as class members so they could be used in next function call.
         val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
     
    -    // A list to hold all matched rows from right side.
    -    val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
    +    val rangeKeys = rangeConditions.map{
    +      case GreaterThan(l, r) => (Some(l), None, Some(r), None)
    +      case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None)
    +      case LessThan(l, r) => (None, Some(l), None, Some(r))
    +      case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r))
    +    }
    +    val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) =
    +      (rangeKeys.map(_._1).flatMap(x => x),
    +        rangeKeys.map(_._2).flatMap(x => x),
    +        rangeKeys.map(_._3).flatMap(x => x),
    +        rangeKeys.map(_._4).flatMap(x => x))
    +
    +    // Variables for secondary range expressions
    +    val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, rightUpperKeyVars) =
    +      if (useInnerRange) {
    +        (createJoinKey(ctx, leftRow, leftLowerKeys, left.output),
    +          createJoinKey(ctx, leftRow, leftUpperKeys, left.output),
    +          createJoinKey(ctx, rightRow, rightLowerKeys, right.output),
    +          createJoinKey(ctx, rightRow, rightUpperKeys, right.output))
    +      }
    +      else {
    +        (Nil, Nil, Nil, Nil)
    +      }
    +
    +    val secRangeDataType = if (leftLowerKeys.size > 0) { leftLowerKeys(0).dataType }
    +      else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType }
    +      else null
    +    val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType)
    +
    +    val (leftLowerSecRangeKey, leftUpperSecRangeKey, rightLowerSecRangeKey, rightUpperSecRangeKey) =
    +      if (useInnerRange) {
    +        (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", secRangeInitValue))
    +      }
    +      else {
    +        (null, null, null, null)
    +      }
    +
    +    // A queue to hold all matched rows from right side.
    +    val clsName = if (useInnerRange) classOf[InMemoryUnsafeRowQueue].getName
    +      else classOf[ExternalAppendOnlyUnsafeRowArray].getName
     
         val spillThreshold = getSpillThreshold
         val inMemoryThreshold = getInMemoryThreshold
     
    -    // Inline mutable state since not many join operations in a task
         val matches = ctx.addMutableState(clsName, "matches",
           v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
    -    // Copy the left keys as class members so they could be used in next function call.
         val matchedKeyVars = copyKeys(ctx, leftKeyVars)
     
    +    val lowerCompop = lowerSecondaryRangeExpression.map {
    +      case GreaterThanOrEqual(_, _) => "<"
    +      case GreaterThan(_, _) => "<="
    +      case _ => ""
    +    }.getOrElse("")
    +    val upperCompop = upperSecondaryRangeExpression.map {
    +      case LessThanOrEqual(_, _) => ">"
    +      case LessThan(_, _) => ">="
    +      case _ => ""
    +    }.getOrElse("")
    +    val lowerCompExp = if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " +
    +        s"$lowerCompop ${rightLowerSecRangeKey.value})"
    +    val upperCompExp = if (!useInnerRange || upperSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " +
    +        s"$upperCompop ${rightUpperSecRangeKey.value})"
    +
    +    logDebug(s"lowerCompExp: $lowerCompExp")
    +    logDebug(s"upperCompExp: $upperCompExp")
    +
    +    // Add secondary range dequeue method
    +    if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty ||
    +        rightLowerKeys.size == 0 || rightUpperKeys.size == 0) {
    +      ctx.addNewFunction("dequeueUntilUpperConditionHolds",
    +        "private void dequeueUntilUpperConditionHolds() { }",
    +        inlineToOuterClass = true)
    +    }
    +    else {
    +      val rightRngTmpKeyVars = createJoinKey(ctx, rightTmpRow,
    +        rightUpperKeys.slice(0, 1), right.output)
    +      val rightRngTmpKeyVarsDecl = rightRngTmpKeyVars.map(_.code).mkString("\n")
    +      rightRngTmpKeyVars.foreach(_.code = "")
    --- End diff --
    
    why are we doing this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188266461
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -434,18 +511,139 @@ case class SortMergeJoinExec(
         // Copy the right key as class members so they could be used in next function call.
         val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
     
    -    // A list to hold all matched rows from right side.
    -    val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
    +    val rangeKeys = rangeConditions.map{
    +      case GreaterThan(l, r) => (Some(l), None, Some(r), None)
    +      case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None)
    +      case LessThan(l, r) => (None, Some(l), None, Some(r))
    +      case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r))
    +    }
    +    val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) =
    +      (rangeKeys.map(_._1).flatMap(x => x),
    +        rangeKeys.map(_._2).flatMap(x => x),
    +        rangeKeys.map(_._3).flatMap(x => x),
    +        rangeKeys.map(_._4).flatMap(x => x))
    +
    +    // Variables for secondary range expressions
    +    val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, rightUpperKeyVars) =
    +      if (useInnerRange) {
    +        (createJoinKey(ctx, leftRow, leftLowerKeys, left.output),
    +          createJoinKey(ctx, leftRow, leftUpperKeys, left.output),
    +          createJoinKey(ctx, rightRow, rightLowerKeys, right.output),
    +          createJoinKey(ctx, rightRow, rightUpperKeys, right.output))
    +      }
    +      else {
    +        (Nil, Nil, Nil, Nil)
    +      }
    +
    +    val secRangeDataType = if (leftLowerKeys.size > 0) { leftLowerKeys(0).dataType }
    +      else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType }
    +      else null
    +    val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType)
    +
    +    val (leftLowerSecRangeKey, leftUpperSecRangeKey, rightLowerSecRangeKey, rightUpperSecRangeKey) =
    +      if (useInnerRange) {
    +        (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", secRangeInitValue))
    +      }
    +      else {
    +        (null, null, null, null)
    +      }
    +
    +    // A queue to hold all matched rows from right side.
    +    val clsName = if (useInnerRange) classOf[InMemoryUnsafeRowQueue].getName
    +      else classOf[ExternalAppendOnlyUnsafeRowArray].getName
     
         val spillThreshold = getSpillThreshold
         val inMemoryThreshold = getInMemoryThreshold
     
    -    // Inline mutable state since not many join operations in a task
         val matches = ctx.addMutableState(clsName, "matches",
           v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
    -    // Copy the left keys as class members so they could be used in next function call.
         val matchedKeyVars = copyKeys(ctx, leftKeyVars)
     
    +    val lowerCompop = lowerSecondaryRangeExpression.map {
    +      case GreaterThanOrEqual(_, _) => "<"
    +      case GreaterThan(_, _) => "<="
    +      case _ => ""
    +    }.getOrElse("")
    +    val upperCompop = upperSecondaryRangeExpression.map {
    +      case LessThanOrEqual(_, _) => ">"
    +      case LessThan(_, _) => ">="
    +      case _ => ""
    +    }.getOrElse("")
    +    val lowerCompExp = if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " +
    +        s"$lowerCompop ${rightLowerSecRangeKey.value})"
    +    val upperCompExp = if (!useInnerRange || upperSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " +
    +        s"$upperCompop ${rightUpperSecRangeKey.value})"
    +
    +    logDebug(s"lowerCompExp: $lowerCompExp")
    +    logDebug(s"upperCompExp: $upperCompExp")
    +
    +    // Add secondary range dequeue method
    +    if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty ||
    +        rightLowerKeys.size == 0 || rightUpperKeys.size == 0) {
    +      ctx.addNewFunction("dequeueUntilUpperConditionHolds",
    +        "private void dequeueUntilUpperConditionHolds() { }",
    +        inlineToOuterClass = true)
    --- End diff --
    
    can we avoid to use `inlineToOuterClass = true`? I think we can avoid doing that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @zecevicp wholestage codegen now is turned on by default only if we have few columns (less than 100). This can be false in many real use-cases. Is there any specific reason why this optimization cannot be applied to the non-wholestage codegen case? If not, I think it is worth to consider also this case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Can you please restart the build? It took 5 hours for some reason, but the "inner range join" tests completed successfully. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91912 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91912/testReport)** for PR 21109 at commit [`3e9b3e2`](https://github.com/apache/spark/commit/3e9b3e2be380d3973cb559ce88b3fff588f3dd3e).
     * This patch **fails Scala style tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Well, that is the essence of the contribution: to have a moving window over the data, instead of a fixed block (per equi-join match). To implement a moving window you need something like a queue.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91913 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91913/testReport)** for PR 21109 at commit [`746fad3`](https://github.com/apache/spark/commit/746fad3f9caf5d8d43f91b66c45091ef52411ce2).
     * This patch **fails Scala style tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91917 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91917/testReport)** for PR 21109 at commit [`8eaab13`](https://github.com/apache/spark/commit/8eaab130b9eb6b9f3224c323165b6dc2b175f8ac).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @tedyu @gatorsmile Can you please trigger the build again? It failed for some unrelated reason. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188254046
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    +                                    left : LogicalPlan, right : LogicalPlan,
    +                                    joinKeys : Seq[(Expression, Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      "none"
    --- End diff --
    
    Seemed simpler. I can change it if you insist.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    There's no design doc. I didn't feel the change was big enough to warrant one.
    
    1. Currently there is no spill-over to disk. If the range is too big, users can switch this off and use the much slower SMJ version, without an OOM. Implementing spill-over doesn't look trivial because it's more dynamic than the original version. It's not clear how to implement that. Maybe we can add that in the future, once we figure it out?
    2. This whole optimization doesn't apply when there is no equal condition.
    3. I didn't understand this case you're describing. Can you elaborate, please? Either way, only one pass through the data is needed, skewed or not skewed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #90642 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90642/testReport)** for PR 21109 at commit [`48b1c0e`](https://github.com/apache/spark/commit/48b1c0e73d3db1c406b719a2343a9c395627bea2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91912 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91912/testReport)** for PR 21109 at commit [`3e9b3e2`](https://github.com/apache/spark/commit/3e9b3e2be380d3973cb559ce88b3fff588f3dd3e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89641 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89641/testReport)** for PR 21109 at commit [`4c6a726`](https://github.com/apache/spark/commit/4c6a726bac71796d7210cfde9cf762dfdc38d165).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94631/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89934 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89934/testReport)** for PR 21109 at commit [`fbec452`](https://github.com/apache/spark/commit/fbec452e6376d35c8001544d96fe42831e790f9d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193753823
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala ---
    @@ -0,0 +1,183 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import java.util.ConcurrentModificationException
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.{SparkEnv, TaskContext}
    +import org.apache.spark.memory.TaskMemoryManager
    +import org.apache.spark.serializer.SerializerManager
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
    +import org.apache.spark.storage.BlockManager
    +
    +/**
    + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
    + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
    + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
    + * excessive memory consumption). Setting these threshold involves following trade-offs:
    + *
    + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
    + *   than is available, resulting in OOM.
    + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
    + *   excessive disk writes. This may lead to a performance regression compared to the normal case
    + *   of using an [[ArrayBuffer]] or [[Array]].
    + */
    +private[sql] class InMemoryUnsafeRowQueue(
    --- End diff --
    
    A queue is needed here because it's a moving window instead of a fixed block of rows. Maybe I missed an existing class that could do this easily so I'll take another look. But, I believe any alternative would indeed be messier.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #94630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94630/testReport)** for PR 21109 at commit [`62a8071`](https://github.com/apache/spark/commit/62a807168db9522673c194fbfa8c326c8688da82).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99907/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89961/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Can you put performance numbers w/ this pr? Also, you'd be better to add benchmark code in `JoinBenchmark`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89641 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89641/testReport)** for PR 21109 at commit [`4c6a726`](https://github.com/apache/spark/commit/4c6a726bac71796d7210cfde9cf762dfdc38d165).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92023 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92023/testReport)** for PR 21109 at commit [`66d7cbf`](https://github.com/apache/spark/commit/66d7cbf76ec0170bd8e78d4b936e6c7650998f34).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92089 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92089/testReport)** for PR 21109 at commit [`9889ba1`](https://github.com/apache/spark/commit/9889ba1ddbf0bfcb4d48b890634b6389ac4bd535).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Btw, thank you @mgaido91 and @kiszk for the comments.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92089/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193736960
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala ---
    @@ -117,101 +131,170 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
         }
     
         def makeSortMergeJoin(
    -        leftKeys: Seq[Expression],
    -        rightKeys: Seq[Expression],
    -        boundCondition: Option[Expression],
    -        leftPlan: SparkPlan,
    -        rightPlan: SparkPlan) = {
    -      val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition,
    -        leftPlan, rightPlan)
    +                           leftKeys: Seq[Expression],
    +                           rightKeys: Seq[Expression],
    +                           boundCondition: Option[Expression],
    +                           rangeConditions: Seq[BinaryComparison],
    +                           leftPlan: SparkPlan,
    +                           rightPlan: SparkPlan) = {
    +      val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, rangeConditions,
    +        boundCondition, leftPlan, rightPlan)
           EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
         }
     
    -    test(s"$testName using BroadcastHashJoin (build=left)") {
    -      extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
    -        withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
    -          checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
    -            makeBroadcastHashJoin(
    -              leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft),
    -            expectedAnswer.map(Row.fromTuple),
    -            sortAnswers = true)
    +    val configOptions = List(
    +      ("spark.sql.codegen.wholeStage", "true"),
    +      ("spark.sql.codegen.wholeStage", "false"))
    +
    +    // Disabling these because the code would never follow this path in case of a inner range join
    +    if (!expectRangeJoin) {
    +      var counter = 1
    --- End diff --
    
    If you want to avoid a `var`, just `configOptions.zipWithIndex.foreach { case ((config, confValue), counter) =>`. Just a tiny bit more idiomatic.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #94630 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94630/testReport)** for PR 21109 at commit [`62a8071`](https://github.com/apache/spark/commit/62a807168db9522673c194fbfa8c326c8688da82).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92071/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89934 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89934/testReport)** for PR 21109 at commit [`fbec452`](https://github.com/apache/spark/commit/fbec452e6376d35c8001544d96fe42831e790f9d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188242302
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    --- End diff --
    
    can we add the return type and add some description to this method? Moreover the name doesn't seem correct, since I'd expect it returning a boolean from its name.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188265529
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -434,18 +511,139 @@ case class SortMergeJoinExec(
         // Copy the right key as class members so they could be used in next function call.
         val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
     
    -    // A list to hold all matched rows from right side.
    -    val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
    +    val rangeKeys = rangeConditions.map{
    +      case GreaterThan(l, r) => (Some(l), None, Some(r), None)
    +      case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None)
    +      case LessThan(l, r) => (None, Some(l), None, Some(r))
    +      case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r))
    +    }
    +    val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) =
    +      (rangeKeys.map(_._1).flatMap(x => x),
    +        rangeKeys.map(_._2).flatMap(x => x),
    +        rangeKeys.map(_._3).flatMap(x => x),
    +        rangeKeys.map(_._4).flatMap(x => x))
    +
    +    // Variables for secondary range expressions
    +    val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, rightUpperKeyVars) =
    +      if (useInnerRange) {
    +        (createJoinKey(ctx, leftRow, leftLowerKeys, left.output),
    +          createJoinKey(ctx, leftRow, leftUpperKeys, left.output),
    +          createJoinKey(ctx, rightRow, rightLowerKeys, right.output),
    +          createJoinKey(ctx, rightRow, rightUpperKeys, right.output))
    +      }
    +      else {
    +        (Nil, Nil, Nil, Nil)
    +      }
    +
    +    val secRangeDataType = if (leftLowerKeys.size > 0) { leftLowerKeys(0).dataType }
    +      else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType }
    +      else null
    +    val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType)
    +
    +    val (leftLowerSecRangeKey, leftUpperSecRangeKey, rightLowerSecRangeKey, rightUpperSecRangeKey) =
    +      if (useInnerRange) {
    +        (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", secRangeInitValue))
    +      }
    +      else {
    +        (null, null, null, null)
    +      }
    +
    +    // A queue to hold all matched rows from right side.
    +    val clsName = if (useInnerRange) classOf[InMemoryUnsafeRowQueue].getName
    +      else classOf[ExternalAppendOnlyUnsafeRowArray].getName
     
         val spillThreshold = getSpillThreshold
         val inMemoryThreshold = getInMemoryThreshold
     
    -    // Inline mutable state since not many join operations in a task
         val matches = ctx.addMutableState(clsName, "matches",
           v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
    -    // Copy the left keys as class members so they could be used in next function call.
         val matchedKeyVars = copyKeys(ctx, leftKeyVars)
     
    +    val lowerCompop = lowerSecondaryRangeExpression.map {
    +      case GreaterThanOrEqual(_, _) => "<"
    +      case GreaterThan(_, _) => "<="
    +      case _ => ""
    +    }.getOrElse("")
    +    val upperCompop = upperSecondaryRangeExpression.map {
    +      case LessThanOrEqual(_, _) => ">"
    +      case LessThan(_, _) => ">="
    +      case _ => ""
    +    }.getOrElse("")
    +    val lowerCompExp = if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " +
    +        s"$lowerCompop ${rightLowerSecRangeKey.value})"
    +    val upperCompExp = if (!useInnerRange || upperSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " +
    +        s"$upperCompop ${rightUpperSecRangeKey.value})"
    +
    +    logDebug(s"lowerCompExp: $lowerCompExp")
    +    logDebug(s"upperCompExp: $upperCompExp")
    +
    +    // Add secondary range dequeue method
    +    if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty ||
    +        rightLowerKeys.size == 0 || rightUpperKeys.size == 0) {
    +      ctx.addNewFunction("dequeueUntilUpperConditionHolds",
    +        "private void dequeueUntilUpperConditionHolds() { }",
    +        inlineToOuterClass = true)
    +    }
    +    else {
    +      val rightRngTmpKeyVars = createJoinKey(ctx, rightTmpRow,
    +        rightUpperKeys.slice(0, 1), right.output)
    +      val rightRngTmpKeyVarsDecl = rightRngTmpKeyVars.map(_.code).mkString("\n")
    +      rightRngTmpKeyVars.foreach(_.code = "")
    --- End diff --
    
    If you mean why are we clearing the `code` variable, I found the same thing in `WholestageCodegenExec:263` where it's claimed that that prevents the variable to be evaluated twice. The code works, so I didn't investigate further.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188246302
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -840,6 +1049,241 @@ private[joins] class SortMergeJoinScanner(
       }
     }
     
    +/**
    + * Helper class that is used to implement [[SortMergeJoinExec]].
    + *
    + * To perform an inner (outer) join, users of this class call [[findNextInnerJoinRows()]]
    + * which returns `true` if a result has been produced and `false`
    + * otherwise. If a result has been produced, then the caller may call [[getStreamedRow]] to return
    + * the matching row from the streamed input and may call [[getBufferedMatches]] to return the
    + * sequence of matching rows from the buffered input (in the case of an outer join, this will
    + * return an empty sequence if there are no matches from the buffered input). For efficiency,
    + * both of these methods return mutable objects which are re-used across calls to
    + * the `findNext*JoinRows()` methods.
    + *
    + * @param streamedKeyGenerator a projection that produces join keys from the streamed input.
    + * @param bufferedKeyGenerator a projection that produces join keys from the buffered input.
    + * @param keyOrdering an ordering which can be used to compare join keys.
    + * @param streamedIter an input whose rows will be streamed.
    + * @param bufferedIter an input whose rows will be buffered to construct sequences of rows that
    + *                     have the same join key.
    + * @param inMemoryThreshold Threshold for number of rows guaranteed to be held in memory by
    + *                          internal buffer
    + * @param spillThreshold Threshold for number of rows to be spilled by internal buffer
    + */
    +private[joins] class SortMergeJoinInnerRangeScanner(
    +                                           streamedKeyGenerator: Projection,
    --- End diff --
    
    nit: indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92038/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92038 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92038/testReport)** for PR 21109 at commit [`66d7cbf`](https://github.com/apache/spark/commit/66d7cbf76ec0170bd8e78d4b936e6c7650998f34).
     * This patch **fails PySpark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193754271
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns) of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        // rangePreds will contain the original expressions to be filtered out later.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    I think you're right.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91824 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91824/testReport)** for PR 21109 at commit [`82c194a`](https://github.com/apache/spark/commit/82c194a8a03b6cc028de303fbc07c68d6078cc2b).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Why do you say that it is difficult to debug? What was difficult?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193735681
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns) of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        // rangePreds will contain the original expressions to be filtered out later.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); GreaterThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map {
    +                case true => rangePreds.add(p); LessThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); LessThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case _ => None
    +            }
    +          } else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  /**
    +   * Checks if l and r are valid range conditions:
    +   *   - l and r expressions should both contain a single reference to one and the same column.
    +   *   - the referenced column should not be part of joinKeys
    +   * If these conditions are not met, the function returns None.
    +   *
    +   * Otherwise, the function checks if the left plan contains l expression and the right plan
    +   * contains r expression. If the expressions need to be switched, the function returns Some(true)
    +   * and Some(false) otherwise.
    +   */
    +  private def checkRangeConditions(l : Expression, r : Expression,
    +      left : LogicalPlan, right : LogicalPlan,
    +      joinKeys : Seq[(Expression, Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      None
    +    }
    +    else if (canEvaluate(l, left) && canEvaluate(r, right)) {
    --- End diff --
    
    Nit: pull else onto previous line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #95807 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95807/testReport)** for PR 21109 at commit [`0a5c8de`](https://github.com/apache/spark/commit/0a5c8de7769315934712bf853401c332dd747a6e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188254449
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    --- End diff --
    
    That's true..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91824/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92247: Deflake Build #92089 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92247/testReport)** for PR 21109 at commit [`9889ba1`](https://github.com/apache/spark/commit/9889ba1ddbf0bfcb4d48b890634b6389ac4bd535).
     * This patch **fails due to an unknown error code, -9**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92472 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92472/testReport)** for PR 21109 at commit [`39247ba`](https://github.com/apache/spark/commit/39247bac0de645aa959cc7fd11a27e36532181a5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91785/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188254469
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    I see... maybe we can make this more clear in the comments then, what do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188241786
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    +                                    left : LogicalPlan, right : LogicalPlan,
    --- End diff --
    
    nit: bad indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Hi, Gaido, thanks for the comment. As I said, it was difficult to debug it and I didn't have time. We might open a different ticket for the non-wholestage codegen case, once this is merged?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91917 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91917/testReport)** for PR 21109 at commit [`8eaab13`](https://github.com/apache/spark/commit/8eaab130b9eb6b9f3224c323165b6dc2b175f8ac).
     * This patch **fails from timeout after a configured wait of \`300m\`**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91534/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91824 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91824/testReport)** for PR 21109 at commit [`82c194a`](https://github.com/apache/spark/commit/82c194a8a03b6cc028de303fbc07c68d6078cc2b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91900 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91900/testReport)** for PR 21109 at commit [`6d72fe0`](https://github.com/apache/spark/commit/6d72fe0466f210e001a67829e0f42379abc7e4f0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188269083
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -434,18 +511,139 @@ case class SortMergeJoinExec(
         // Copy the right key as class members so they could be used in next function call.
         val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
     
    -    // A list to hold all matched rows from right side.
    -    val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
    +    val rangeKeys = rangeConditions.map{
    +      case GreaterThan(l, r) => (Some(l), None, Some(r), None)
    +      case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None)
    +      case LessThan(l, r) => (None, Some(l), None, Some(r))
    +      case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r))
    +    }
    +    val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) =
    +      (rangeKeys.map(_._1).flatMap(x => x),
    +        rangeKeys.map(_._2).flatMap(x => x),
    +        rangeKeys.map(_._3).flatMap(x => x),
    +        rangeKeys.map(_._4).flatMap(x => x))
    +
    +    // Variables for secondary range expressions
    +    val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, rightUpperKeyVars) =
    +      if (useInnerRange) {
    +        (createJoinKey(ctx, leftRow, leftLowerKeys, left.output),
    +          createJoinKey(ctx, leftRow, leftUpperKeys, left.output),
    +          createJoinKey(ctx, rightRow, rightLowerKeys, right.output),
    +          createJoinKey(ctx, rightRow, rightUpperKeys, right.output))
    +      }
    +      else {
    +        (Nil, Nil, Nil, Nil)
    +      }
    +
    +    val secRangeDataType = if (leftLowerKeys.size > 0) { leftLowerKeys(0).dataType }
    +      else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType }
    +      else null
    +    val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType)
    +
    +    val (leftLowerSecRangeKey, leftUpperSecRangeKey, rightLowerSecRangeKey, rightUpperSecRangeKey) =
    +      if (useInnerRange) {
    +        (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", secRangeInitValue))
    +      }
    +      else {
    +        (null, null, null, null)
    +      }
    +
    +    // A queue to hold all matched rows from right side.
    +    val clsName = if (useInnerRange) classOf[InMemoryUnsafeRowQueue].getName
    +      else classOf[ExternalAppendOnlyUnsafeRowArray].getName
     
         val spillThreshold = getSpillThreshold
         val inMemoryThreshold = getInMemoryThreshold
     
    -    // Inline mutable state since not many join operations in a task
         val matches = ctx.addMutableState(clsName, "matches",
           v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true)
    -    // Copy the left keys as class members so they could be used in next function call.
         val matchedKeyVars = copyKeys(ctx, leftKeyVars)
     
    +    val lowerCompop = lowerSecondaryRangeExpression.map {
    +      case GreaterThanOrEqual(_, _) => "<"
    +      case GreaterThan(_, _) => "<="
    +      case _ => ""
    +    }.getOrElse("")
    +    val upperCompop = upperSecondaryRangeExpression.map {
    +      case LessThanOrEqual(_, _) => ">"
    +      case LessThan(_, _) => ">="
    +      case _ => ""
    +    }.getOrElse("")
    +    val lowerCompExp = if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " +
    +        s"$lowerCompop ${rightLowerSecRangeKey.value})"
    +    val upperCompExp = if (!useInnerRange || upperSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " +
    +        s"$upperCompop ${rightUpperSecRangeKey.value})"
    +
    +    logDebug(s"lowerCompExp: $lowerCompExp")
    +    logDebug(s"upperCompExp: $upperCompExp")
    +
    +    // Add secondary range dequeue method
    +    if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty ||
    +        rightLowerKeys.size == 0 || rightUpperKeys.size == 0) {
    +      ctx.addNewFunction("dequeueUntilUpperConditionHolds",
    +        "private void dequeueUntilUpperConditionHolds() { }",
    +        inlineToOuterClass = true)
    --- End diff --
    
    Ah, OK. I'll investigate.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89592/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    What's the advantage of this feature when Spark can rewrite range join to equal join logically?
    
    BTW I also hesitate to merge such a big patch to the SQL engine since we are close to code freeze.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #90475 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90475/testReport)** for PR 21109 at commit [`535d0d6`](https://github.com/apache/spark/commit/535d0d63b33ef320fded52c18b2716a1333255ce).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #90477 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90477/testReport)** for PR 21109 at commit [`6dc9000`](https://github.com/apache/spark/commit/6dc90002602d7790d2ab4a9d4946f61c4c8c078e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193751918
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1205,6 +1205,19 @@ object SQLConf {
           .booleanConf
           .createWithDefault(true)
     
    +  val USE_SMJ_INNER_RANGE_OPTIMIZATION =
    --- End diff --
    
    It's just a safety valve. In case there are some queries that I don't foresee now where this could get in the way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91913 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91913/testReport)** for PR 21109 at commit [`746fad3`](https://github.com/apache/spark/commit/746fad3f9caf5d8d43f91b66c45091ef52411ce2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188253438
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -97,13 +100,13 @@ object PhysicalOperation extends PredicateHelper {
      * value).
      */
     object ExtractEquiJoinKeys extends Logging with PredicateHelper {
    -  /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
    +  /** (joinType, leftKeys, rightKeys, rangeConditions, condition, leftChild, rightChild) */
       type ReturnType =
    -    (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
    +    (JoinType, Seq[Expression], Seq[Expression], Seq[BinaryComparison],
    +      Option[Expression], LogicalPlan, LogicalPlan)
     
       def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
         case join @ Join(left, right, joinType, condition) =>
    -      logDebug(s"Considering join on: $condition")
    --- End diff --
    
    OK, you could be right. I'll put it back.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188251861
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -97,13 +100,13 @@ object PhysicalOperation extends PredicateHelper {
      * value).
      */
     object ExtractEquiJoinKeys extends Logging with PredicateHelper {
    -  /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
    +  /** (joinType, leftKeys, rightKeys, rangeConditions, condition, leftChild, rightChild) */
       type ReturnType =
    -    (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
    +    (JoinType, Seq[Expression], Seq[Expression], Seq[BinaryComparison],
    +      Option[Expression], LogicalPlan, LogicalPlan)
     
       def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
         case join @ Join(left, right, joinType, condition) =>
    -      logDebug(s"Considering join on: $condition")
    --- End diff --
    
    I don't think so, this is the diff with the master, it was added in 2014....


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188252110
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    I see that you are using it later but can't you use `rangeConditions` instead? they seem duplicates...they contain the same data IIUC


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188240094
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    --- End diff --
    
    nit: unnecessary empty line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92023/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91533 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91533/testReport)** for PR 21109 at commit [`9a0b2ab`](https://github.com/apache/spark/commit/9a0b2abf482a51c1e9901ce5297c9ca1b7961765).
     * This patch **fails Scala style tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92441 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92441/testReport)** for PR 21109 at commit [`a2a5f82`](https://github.com/apache/spark/commit/a2a5f82c377402348a82e5db5587504d39f5a894).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188250615
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -97,13 +100,13 @@ object PhysicalOperation extends PredicateHelper {
      * value).
      */
     object ExtractEquiJoinKeys extends Logging with PredicateHelper {
    -  /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
    +  /** (joinType, leftKeys, rightKeys, rangeConditions, condition, leftChild, rightChild) */
       type ReturnType =
    -    (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
    +    (JoinType, Seq[Expression], Seq[Expression], Seq[BinaryComparison],
    +      Option[Expression], LogicalPlan, LogicalPlan)
     
       def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
         case join @ Join(left, right, joinType, condition) =>
    -      logDebug(s"Considering join on: $condition")
    --- End diff --
    
    I added it in the first place. Tried to remove unnecessary code


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193735061
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns) of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        // rangePreds will contain the original expressions to be filtered out later.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    I tend to prefer `val rangePreds = mutable.Set.empty[Expression]` as it's shorter, but that's just taste


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92089 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92089/testReport)** for PR 21109 at commit [`9889ba1`](https://github.com/apache/spark/commit/9889ba1ddbf0bfcb4d48b890634b6389ac4bd535).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #90477 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90477/testReport)** for PR 21109 at commit [`6dc9000`](https://github.com/apache/spark/commit/6dc90002602d7790d2ab4a9d4946f61c4c8c078e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92071 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92071/testReport)** for PR 21109 at commit [`3ccc292`](https://github.com/apache/spark/commit/3ccc2929dfd828e333d4deaacdb993e9fc7e5f28).
     * This patch **fails due to an unknown error code, -9**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91534 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91534/testReport)** for PR 21109 at commit [`3fa690f`](https://github.com/apache/spark/commit/3fa690faf4e9a0b7d8eb2a5854ebb6a854a44d2a).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95807/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193743605
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1205,6 +1205,19 @@ object SQLConf {
           .booleanConf
           .createWithDefault(true)
     
    +  val USE_SMJ_INNER_RANGE_OPTIMIZATION =
    --- End diff --
    
    Yes, at best make this internal. Are there conditions where you would not want to apply this? is it just a safety valve?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188241708
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    why do we need this? Can't we just use `rangeConditions`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by ctslater <gi...@git.apache.org>.
Github user ctslater commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @cloud-fan, could you point us at the papers that describe rewriting range join into equi-join? That would be a very convenient solution, but I've been digging through the literature and haven't found anything showing how to do that. All the experiments I've tried result in cartesian joins.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #91534 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91534/testReport)** for PR 21109 at commit [`3fa690f`](https://github.com/apache/spark/commit/3fa690faf4e9a0b7d8eb2a5854ebb6a854a44d2a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188264496
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    --- End diff --
    
    nit: `} else {`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91760/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    > Regarding the second point, this is not an ordinary range join, but an equi-join with a secondary range condition.
    
    "an equi-join with a secondary range condition" is a restriction isn't it? IIRC the logical rewrite approach can work for range join without equi-join, which has broader use cases. e.g. turning a cartisian join into hash join/SMJ.
    
    I took a quick look at this patch, it's really bulky, and not safe to be turned on by default without the spill ability. And more importantly, it can't work for range join without equi-condition, which I think should be a common use case.
    
    Can you investigate the logical rewrite approach? I'll attach some related papers if I find them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188253781
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    `rangeConditions` contain "vice-versa" conditions in case left and right plans need to be switched.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @davies pointed out that `InMemoryUnsafeRowQueue` was throwing an exception when `numRowsInMemoryBufferThreshold` was reached. That is now fixed and only a warning is logged. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    @zecevicp for instance do we really need `InMemoryUnsafeRowQueue`? why `ExternalAppendOnlyUnsafeRowArray` is not ok?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #92071 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92071/testReport)** for PR 21109 at commit [`3ccc292`](https://github.com/apache/spark/commit/3ccc2929dfd828e333d4deaacdb993e9fc7e5f28).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #89592 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89592/testReport)** for PR 21109 at commit [`4c6a726`](https://github.com/apache/spark/commit/4c6a726bac71796d7210cfde9cf762dfdc38d165).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    **[Test build #94631 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94631/testReport)** for PR 21109 at commit [`af59b8a`](https://github.com/apache/spark/commit/af59b8a285ffe968d87cdf1bf9758b142e0bcfd4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89641/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21109
  
    Thanks for working on this.
    
    Based on the description on JIRA, I think the main cause of the bad performance is re-calculation an expensive function on matches rows. With the added benchmark, I adjust the order of conditions so the expensive UDF is put at the end of predicate. Below is the results. The first one is original benchmark. The second is the one with UDF at the end of predicate.
    
    
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.9.87-linuxkit-aufs
    Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
    sort merge inner range join:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    sort merge inner range join wholestage off      6913 / 6964          0.0     1080112.4       1.0X
    sort merge inner range join wholestage on      2094 / 2224          0.0      327217.4       3.3X
    ```
    
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.9.87-linuxkit-aufs
    Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
    sort merge inner range join:             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    sort merge inner range join wholestage off       675 /  704          0.0      105493.9       1.0X
    sort merge inner range join wholestage on       374 /  398          0.0       58359.6       1.8X
    ```
    
    It can be easily improved because short-circuit evaluation of predicate. This can be applied to also other conditions other than just range comparison. So I'm thinking if we need a way to give a hint to Spark to adjust the order of expression for an expensive one like UDF.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188258635
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    --- End diff --
    
    Yes, will do


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

Posted by zecevicp <gi...@git.apache.org>.
Github user zecevicp commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193753965
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala ---
    @@ -117,101 +131,170 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
         }
     
         def makeSortMergeJoin(
    -        leftKeys: Seq[Expression],
    -        rightKeys: Seq[Expression],
    -        boundCondition: Option[Expression],
    -        leftPlan: SparkPlan,
    -        rightPlan: SparkPlan) = {
    -      val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition,
    -        leftPlan, rightPlan)
    +                           leftKeys: Seq[Expression],
    +                           rightKeys: Seq[Expression],
    +                           boundCondition: Option[Expression],
    +                           rangeConditions: Seq[BinaryComparison],
    +                           leftPlan: SparkPlan,
    +                           rightPlan: SparkPlan) = {
    +      val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, rangeConditions,
    +        boundCondition, leftPlan, rightPlan)
           EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
         }
     
    -    test(s"$testName using BroadcastHashJoin (build=left)") {
    -      extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
    -        withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
    -          checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
    -            makeBroadcastHashJoin(
    -              leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft),
    -            expectedAnswer.map(Row.fromTuple),
    -            sortAnswers = true)
    +    val configOptions = List(
    +      ("spark.sql.codegen.wholeStage", "true"),
    +      ("spark.sql.codegen.wholeStage", "false"))
    +
    +    // Disabling these because the code would never follow this path in case of a inner range join
    +    if (!expectRangeJoin) {
    +      var counter = 1
    --- End diff --
    
    OK, will do that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org