You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2023/06/02 17:48:49 UTC

[spark] branch master updated: [SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join

This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0effbec16ed [SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join
0effbec16ed is described below

commit 0effbec16edc27c644d4089bdf266cd4ecbed235
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Fri Jun 2 10:48:32 2023 -0700

    [SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join
    
    ### What changes were proposed in this pull request?
    Add support for shuffle-hash join for following scenarios:
    
    * left outer join with left-side build
    * right outer join with right-side build
    
    The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join.
    
    The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations:
    
    1.  Iterate Stream side.
      a. If find match on build side, mark.
      b. If no match on build side, join with null build-side row and add to result
    2. Iterate build side.
      a. If find marked for match, add joined row to result
      b. If no match marked, join with null stream-side row
    
    The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op.
    
    Codegen is left for a follow-up PR.
    
    ### Why are the changes needed?
    For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Unit test in JoinSuite.scala
    
    Closes #41398 from szehon-ho/same_side_outer_build_join_master.
    
    Authored-by: Szehon Ho <sz...@gmail.com>
    Signed-off-by: huaxingao <hu...@apple.com>
---
 .../spark/sql/catalyst/optimizer/joins.scala       |  4 +-
 .../spark/sql/execution/joins/HashedRelation.scala |  1 -
 .../sql/execution/joins/ShuffledHashJoinExec.scala | 74 ++++++++++++++-------
 .../scala/org/apache/spark/sql/JoinHintSuite.scala | 30 ++++++---
 .../scala/org/apache/spark/sql/JoinSuite.scala     | 77 ++++++++++++++++++++++
 5 files changed, 151 insertions(+), 35 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 972be43a946..48b4007a897 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -374,14 +374,14 @@ trait JoinSelectionHelper {
 
   def canBuildShuffledHashJoinLeft(joinType: JoinType): Boolean = {
     joinType match {
-      case _: InnerLike | RightOuter | FullOuter => true
+      case _: InnerLike | LeftOuter | FullOuter | RightOuter => true
       case _ => false
     }
   }
 
   def canBuildShuffledHashJoinRight(joinType: JoinType): Boolean = {
     joinType match {
-      case _: InnerLike | LeftOuter | FullOuter |
+      case _: InnerLike | LeftOuter | FullOuter | RightOuter |
            LeftSemi | LeftAnti | _: ExistenceJoin => true
       case _ => false
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 4d3e63282fa..16345bb35db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -127,7 +127,6 @@ private[execution] object HashedRelation {
    * Create a HashedRelation from an Iterator of InternalRow.
    *
    * @param allowsNullKey        Allow NULL keys in HashedRelation.
-   *                             This is used for full outer join in `ShuffledHashJoinExec` only.
    * @param ignoresDuplicatedKey Ignore rows with duplicated keys in HashedRelation.
    *                             This is only used for semi and anti join without join condition in
    *                             `ShuffledHashJoinExec` only.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index cfe35d04778..8953bf19f35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -56,7 +56,12 @@ case class ShuffledHashJoinExec(
   override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning
 
   override def outputOrdering: Seq[SortOrder] = joinType match {
+    // For outer joins where the outer side is build-side, order cannot be guaranteed.
+    // The algorithm performs an additional un-ordered iteration on build-side (HashedRelation)
+    // to find unmatched rows to satisfy the outer join semantic.
     case FullOuter => Nil
+    case LeftOuter if buildSide == BuildLeft => Nil
+    case RightOuter if buildSide == BuildRight => Nil
     case _ => super.outputOrdering
   }
 
@@ -83,8 +88,10 @@ case class ShuffledHashJoinExec(
       iter,
       buildBoundKeys,
       taskMemoryManager = context.taskMemoryManager(),
-      // Full outer join needs support for NULL key in HashedRelation.
-      allowsNullKey = joinType == FullOuter,
+      // build-side or full outer join needs support for NULL key in HashedRelation.
+      allowsNullKey = joinType == FullOuter ||
+        (joinType == LeftOuter && buildSide == BuildLeft) ||
+        (joinType == RightOuter && buildSide == BuildRight),
       ignoresDuplicatedKey = ignoreDuplicatedKey)
     buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
     buildDataSize += relation.estimatedSize
@@ -98,16 +105,22 @@ case class ShuffledHashJoinExec(
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
       val hashed = buildHashedRelation(buildIter)
       joinType match {
-        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case FullOuter => buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows,
+          isFullOuterJoin = true)
+        case LeftOuter if buildSide.equals(BuildLeft) =>
+          buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false)
+        case RightOuter if buildSide.equals(BuildRight) =>
+          buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false)
         case _ => join(streamIter, hashed, numOutputRows)
       }
     }
   }
 
-  private def fullOuterJoin(
+  private def buildSideOrFullOuterJoin(
       streamIter: Iterator[InternalRow],
       hashedRelation: HashedRelation,
-      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+      numOutputRows: SQLMetric,
+      isFullOuterJoin: Boolean): Iterator[InternalRow] = {
     val joinKeys = streamSideKeyGenerator()
     val joinRow = new JoinedRow
     val (joinRowWithStream, joinRowWithBuild) = {
@@ -130,11 +143,11 @@ case class ShuffledHashJoinExec(
     }
 
     val iter = if (hashedRelation.keyIsUnique) {
-      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
-        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
+      buildSideOrFullOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin)
     } else {
-      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
-        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
+      buildSideOrFullOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin)
     }
 
     val resultProj = UnsafeProjection.create(output, output)
@@ -145,7 +158,7 @@ case class ShuffledHashJoinExec(
   }
 
   /**
-   * Full outer shuffled hash join with unique join keys:
+   * Shuffled hash join with unique join keys, where an outer side is the build side.
    * 1. Process rows from stream side by looking up hash relation.
    *    Mark the matched rows from build side be looked up.
    *    A bit set is used to track matched rows with key index.
@@ -153,23 +166,30 @@ case class ShuffledHashJoinExec(
    *    Filter out rows from build side being matched already,
    *    by checking key index from bit set.
    */
-  private def fullOuterJoinWithUniqueKey(
+  private def buildSideOrFullOuterJoinUniqueKey(
       streamIter: Iterator[InternalRow],
       hashedRelation: HashedRelation,
       joinKeys: UnsafeProjection,
       joinRowWithStream: InternalRow => JoinedRow,
       joinRowWithBuild: InternalRow => JoinedRow,
       streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
-      buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
+      buildNullRow: GenericInternalRow,
+      isFullOuterJoin: Boolean): Iterator[InternalRow] = {
     val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
     longMetric("buildDataSize") += matchedKeys.capacity / 8
 
+    def noMatch = if (isFullOuterJoin) {
+      Some(joinRowWithBuild(buildNullRow))
+    } else {
+      None
+    }
+
     // Process stream side with looking up hash relation
-    val streamResultIter = streamIter.map { srow =>
+    val streamResultIter = streamIter.flatMap { srow =>
       joinRowWithStream(srow)
       val keys = joinKeys(srow)
       if (keys.anyNull) {
-        joinRowWithBuild(buildNullRow)
+        noMatch
       } else {
         val matched = hashedRelation.getValueWithKeyIndex(keys)
         if (matched != null) {
@@ -178,12 +198,12 @@ case class ShuffledHashJoinExec(
           val joinRow = joinRowWithBuild(buildRow)
           if (boundCondition(joinRow)) {
             matchedKeys.set(keyIndex)
-            joinRow
+            Some(joinRow)
           } else {
-            joinRowWithBuild(buildNullRow)
+            noMatch
           }
         } else {
-          joinRowWithBuild(buildNullRow)
+          noMatch
         }
       }
     }
@@ -205,7 +225,7 @@ case class ShuffledHashJoinExec(
   }
 
   /**
-   * Full outer shuffled hash join with non-unique join keys:
+   * Shuffled hash join with non-unique join keys, where an outer side is the build side.
    * 1. Process rows from stream side by looking up hash relation.
    *    Mark the matched rows from build side be looked up.
    *    A [[OpenHashSet]] (Long) is used to track matched rows with
@@ -219,14 +239,15 @@ case class ShuffledHashJoinExec(
    * the value indices of its tuples will be 0, 1 and 2.
    * Note that value indices of tuples with different keys are incomparable.
    */
-  private def fullOuterJoinWithNonUniqueKey(
+  private def buildSideOrFullOuterJoinNonUniqueKey(
       streamIter: Iterator[InternalRow],
       hashedRelation: HashedRelation,
       joinKeys: UnsafeProjection,
       joinRowWithStream: InternalRow => JoinedRow,
       joinRowWithBuild: InternalRow => JoinedRow,
       streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
-      buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
+      buildNullRow: GenericInternalRow,
+      isFullOuterJoin: Boolean): Iterator[InternalRow] = {
     val matchedRows = new OpenHashSet[Long]
     TaskContext.get().addTaskCompletionListener[Unit](_ => {
       // At the end of the task, update the task's memory usage for this
@@ -252,7 +273,12 @@ case class ShuffledHashJoinExec(
       val joinRow = joinRowWithStream(srow)
       val keys = joinKeys(srow)
       if (keys.anyNull) {
-        Iterator.single(joinRowWithBuild(buildNullRow))
+        // return row with build side NULL row to satisfy full outer join semantics if enabled
+        if (isFullOuterJoin) {
+          Iterator.single(joinRowWithBuild(buildNullRow))
+        } else {
+          Iterator.empty
+        }
       } else {
         val buildIter = hashedRelation.getWithKeyIndex(keys)
         new RowIterator {
@@ -272,8 +298,8 @@ case class ShuffledHashJoinExec(
             }
             // When we reach here, it means no match is found for this key.
             // So we need to return one row with build side NULL row,
-            // to satisfy the full outer join semantic.
-            if (!found) {
+            // to satisfy the full outer join semantic if enabled.
+            if (!found && isFullOuterJoin) {
               joinRowWithBuild(buildNullRow)
               // Set `found` to be true as we only need to return one row
               // but no more.
@@ -314,6 +340,8 @@ case class ShuffledHashJoinExec(
 
   override def supportCodegen: Boolean = joinType match {
     case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
+    case LeftOuter if buildSide == BuildLeft => false
+    case RightOuter if buildSide == BuildRight => false
     case _ => true
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
index 1792b4c32eb..7af826583bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
@@ -489,10 +489,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
         assertShuffleHashJoin(
           sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft)
         assertShuffleHashJoin(
-          sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight)
+          sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft)
         assertShuffleHashJoin(
           sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft)
 
+        // Determine build side based on hint
+        assertShuffleHashJoin(
+          sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildLeft)
+        assertShuffleHashJoin(
+          sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil, "right")), BuildRight)
+
         // Shuffle-hash hint prioritized over shuffle-replicate-nl hint
         assertShuffleHashJoin(
           sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)),
@@ -507,8 +513,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
           BuildLeft)
 
         // Shuffle-hash hint specified but not doable
-        assertBroadcastHashJoin(
-          sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight)
         assertBroadcastNLJoin(
           sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft)
       }
@@ -606,13 +610,25 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
         withLogAppender(hintAppender, level = Some(Level.WARN)) {
           assertShuffleMergeJoin(
             df1.hint("BROADCAST").join(df2, $"a1" === $"b1", joinType))
+        }
+
+        val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+          .filter(_.contains("is not supported in the query:"))
+        assert(logs.size === 1)
+        logs.foreach(log =>
+          assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join.")))
+      }
+
+      Seq("left_semi", "left_anti").foreach { joinType =>
+        val hintAppender = new LogAppender(s"join hint build side check for $joinType")
+        withLogAppender(hintAppender, level = Some(Level.WARN)) {
           assertShuffleMergeJoin(
             df1.hint("SHUFFLE_HASH").join(df2, $"a1" === $"b1", joinType))
         }
 
         val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
           .filter(_.contains("is not supported in the query:"))
-        assert(logs.size === 2)
+        assert(logs.size === 1)
         logs.foreach(log =>
           assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join.")))
       }
@@ -622,8 +638,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
         withLogAppender(hintAppender, level = Some(Level.WARN)) {
           assertBroadcastHashJoin(
             df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType), BuildRight)
-          assertShuffleHashJoin(
-            df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight)
         }
 
         val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
@@ -636,12 +650,10 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
         withLogAppender(hintAppender, level = Some(Level.WARN)) {
           assertShuffleMergeJoin(
             df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType))
-          assertShuffleMergeJoin(
-            df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType))
         }
         val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
           .filter(_.contains("is not supported in the query:"))
-        assert(logs.size === 2)
+        assert(logs.size === 1)
         logs.foreach(log =>
           assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join.")))
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 2c242965339..12e3f2eb202 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1249,6 +1249,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
     }
   }
 
+  test("SPARK-36612: Support left outer join build left or right outer join build right in " +
+    "shuffled hash join") {
+    val inputDFs = Seq(
+      // Test unique join key
+      (spark.range(10).selectExpr("id as k1"),
+        spark.range(30).selectExpr("id as k2"),
+        $"k1" === $"k2"),
+      // Test non-unique join key
+      (spark.range(10).selectExpr("id % 5 as k1"),
+        spark.range(30).selectExpr("id % 5 as k2"),
+        $"k1" === $"k2"),
+      // Test empty build side
+      (spark.range(10).selectExpr("id as k1").filter("k1 < -1"),
+        spark.range(30).selectExpr("id as k2"),
+        $"k1" === $"k2"),
+      // Test empty stream side
+      (spark.range(10).selectExpr("id as k1"),
+        spark.range(30).selectExpr("id as k2").filter("k2 < -1"),
+        $"k1" === $"k2"),
+      // Test empty build and stream side
+      (spark.range(10).selectExpr("id as k1").filter("k1 < -1"),
+        spark.range(30).selectExpr("id as k2").filter("k2 < -1"),
+        $"k1" === $"k2"),
+      // Test string join key
+      (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+        spark.range(30).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test build side at right
+      (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+        spark.range(10).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test NULL join key
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"),
+        spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"),
+        $"k1" === $"k2"),
+      (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"),
+        spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"),
+        $"k1" === $"k2"),
+      // Test multiple join keys
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
+        "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"),
+        spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr(
+          "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"),
+        $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
+    )
+
+    // test left outer with left side build
+    inputDFs.foreach { case (df1, df2, joinExprs) =>
+      val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter")
+      assert(collect(smjDF.queryExecution.executedPlan) {
+        case _: SortMergeJoinExec => true }.size === 1)
+      val smjResult = smjDF.collect()
+
+      val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter")
+      assert(collect(shjDF.queryExecution.executedPlan) {
+        case _: ShuffledHashJoinExec => true
+      }.size === 1)
+      // Same result between shuffled hash join and sort merge join
+      checkAnswer(shjDF, smjResult)
+    }
+
+    // test right outer with right side build
+    inputDFs.foreach { case (df2, df1, joinExprs) =>
+      val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter")
+      assert(collect(smjDF.queryExecution.executedPlan) {
+        case _: SortMergeJoinExec => true }.size === 1)
+      val smjResult = smjDF.collect()
+
+      val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter")
+      assert(collect(shjDF.queryExecution.executedPlan) {
+        case _: ShuffledHashJoinExec => true
+      }.size === 1)
+      // Same result between shuffled hash join and sort merge join
+      checkAnswer(shjDF, smjResult)
+    }
+  }
+
   test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") {
     val inputDFs = Seq(
       // Test empty build side for inner join


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org