You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "caican00 (via GitHub)" <gi...@apache.org> on 2023/07/14 10:31:49 UTC

[GitHub] [spark] caican00 opened a new pull request, #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

caican00 opened a new pull request, #42003:
URL: https://github.com/apache/spark/pull/42003

   ### What changes were proposed in this pull request?
   Supports automatic processing of data skews of the ExistenceJoin type, and only for left-table skews.
   
   
   ### Why are the changes needed?
   Automatic processing of data skews of the ExistenceJoin type is not currently supported and if data skew occurs, the application executes very slowly.
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. It will automatically handle data skew in the left table.
   
   ### How was this patch tested?
   New UT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] caican00 commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "caican00 (via GitHub)" <gi...@apache.org>.
caican00 commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1271791289


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {

Review Comment:
   Thanks,Updated.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {

Review Comment:
   Thanks,Updated.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewData1")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(
+            joins: Seq[SortMergeJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
+          assert(joins.size == 2 && joins.last.isSkewJoin)
+          assert(joins.last.left.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)
+          assert(joins.last.right.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == rightSkewNum)
+        }
+
+        // skewed ExistenceJoin optimization for left side
+        val (_, existenceAdaptivePlanForLeft) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData1
+             |where
+             |(key1 in (select key2 from skewData2)
+             |or value1 in (select value2 from skewData2)
+             |)""".stripMargin)
+        val existenceSmjForLeft = findTopLevelSortMergeJoin(existenceAdaptivePlanForLeft)
+        assert(existenceSmjForLeft.nonEmpty &&
+          existenceSmjForLeft.last.joinType.isInstanceOf[ExistenceJoin])
+        checkSkewJoin(existenceSmjForLeft, 2, 0)
+
+        // forbid skewed ExistenceJoin optimization for right side
+        val (_, existenceAdaptivePlanForRight) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData2
+             |where
+             |(key2 in (select key1 from skewData1)
+             |or value2 in (select value1 from skewData1)
+             |)""".stripMargin)

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin
URL: https://github.com/apache/spark/pull/42003


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] caican00 commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "caican00 (via GitHub)" <gi...@apache.org>.
caican00 commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1271791374


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewData1")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(
+            joins: Seq[SortMergeJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
+          assert(joins.size == 2 && joins.last.isSkewJoin)
+          assert(joins.last.left.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)
+          assert(joins.last.right.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == rightSkewNum)
+        }
+
+        // skewed ExistenceJoin optimization for left side
+        val (_, existenceAdaptivePlanForLeft) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData1
+             |where
+             |(key1 in (select key2 from skewData2)
+             |or value1 in (select value2 from skewData2)
+             |)""".stripMargin)
+        val existenceSmjForLeft = findTopLevelSortMergeJoin(existenceAdaptivePlanForLeft)
+        assert(existenceSmjForLeft.nonEmpty &&
+          existenceSmjForLeft.last.joinType.isInstanceOf[ExistenceJoin])
+        checkSkewJoin(existenceSmjForLeft, 2, 0)
+
+        // forbid skewed ExistenceJoin optimization for right side
+        val (_, existenceAdaptivePlanForRight) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData2
+             |where
+             |(key2 in (select key1 from skewData1)
+             |or value2 in (select value1 from skewData1)
+             |)""".stripMargin)

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] caican00 commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "caican00 (via GitHub)" <gi...@apache.org>.
caican00 commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1271791289


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {

Review Comment:
   @ulysses-you Thanks,Updated.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewData1")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(
+            joins: Seq[SortMergeJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
+          assert(joins.size == 2 && joins.last.isSkewJoin)
+          assert(joins.last.left.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)
+          assert(joins.last.right.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == rightSkewNum)
+        }
+
+        // skewed ExistenceJoin optimization for left side
+        val (_, existenceAdaptivePlanForLeft) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData1
+             |where
+             |(key1 in (select key2 from skewData2)
+             |or value1 in (select value2 from skewData2)
+             |)""".stripMargin)
+        val existenceSmjForLeft = findTopLevelSortMergeJoin(existenceAdaptivePlanForLeft)
+        assert(existenceSmjForLeft.nonEmpty &&
+          existenceSmjForLeft.last.joinType.isInstanceOf[ExistenceJoin])
+        checkSkewJoin(existenceSmjForLeft, 2, 0)
+
+        // forbid skewed ExistenceJoin optimization for right side
+        val (_, existenceAdaptivePlanForRight) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData2
+             |where
+             |(key2 in (select key1 from skewData1)
+             |or value2 in (select value1 from skewData1)
+             |)""".stripMargin)

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #42003:
URL: https://github.com/apache/spark/pull/42003#issuecomment-1789865574

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1271641313


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewData1")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(
+            joins: Seq[SortMergeJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
+          assert(joins.size == 2 && joins.last.isSkewJoin)
+          assert(joins.last.left.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)
+          assert(joins.last.right.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == rightSkewNum)
+        }
+
+        // skewed ExistenceJoin optimization for left side
+        val (_, existenceAdaptivePlanForLeft) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData1
+             |where
+             |(key1 in (select key2 from skewData2)
+             |or value1 in (select value2 from skewData2)
+             |)""".stripMargin)

Review Comment:
   ```sql
   SELECT * FROM nonSkewed WHERE key2 IN (SELECT key1 FROM skewed) OR key2 > 1
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {

Review Comment:
   `skewData1` ->`skewed`, `skewData2` -> `nonSkewed`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewData1")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(
+            joins: Seq[SortMergeJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
+          assert(joins.size == 2 && joins.last.isSkewJoin)
+          assert(joins.last.left.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)
+          assert(joins.last.right.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == rightSkewNum)
+        }
+
+        // skewed ExistenceJoin optimization for left side
+        val (_, existenceAdaptivePlanForLeft) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData1
+             |where
+             |(key1 in (select key2 from skewData2)
+             |or value1 in (select value2 from skewData2)
+             |)""".stripMargin)
+        val existenceSmjForLeft = findTopLevelSortMergeJoin(existenceAdaptivePlanForLeft)
+        assert(existenceSmjForLeft.nonEmpty &&
+          existenceSmjForLeft.last.joinType.isInstanceOf[ExistenceJoin])
+        checkSkewJoin(existenceSmjForLeft, 2, 0)
+
+        // forbid skewed ExistenceJoin optimization for right side
+        val (_, existenceAdaptivePlanForRight) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData2
+             |where
+             |(key2 in (select key1 from skewData1)
+             |or value2 in (select value1 from skewData1)
+             |)""".stripMargin)

Review Comment:
   ```sql
   SELECT * FROM skewed WHERE key1 IN (SELECT key2 FROM nonSkewed) OR key1 > 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #42003:
URL: https://github.com/apache/spark/pull/42003#issuecomment-1641436876

   cc @ulysses-you 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] caican00 commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "caican00 (via GitHub)" <gi...@apache.org>.
caican00 commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1271800576


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,81 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewData1")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(
+            joins: Seq[SortMergeJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
+          assert(joins.size == 2 && joins.last.isSkewJoin)
+          assert(joins.last.left.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)
+          assert(joins.last.right.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == rightSkewNum)
+        }
+
+        // skewed ExistenceJoin optimization for left side
+        val (_, existenceAdaptivePlanForLeft) = runAdaptiveAndVerifyResult(
+          s"""
+             |SELECT * FROM skewData1
+             |where
+             |(key1 in (select key2 from skewData2)
+             |or value1 in (select value2 from skewData2)
+             |)""".stripMargin)

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] caican00 commented on pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "caican00 (via GitHub)" <gi...@apache.org>.
caican00 commented on PR #42003:
URL: https://github.com/apache/spark/pull/42003#issuecomment-1637279382

   Could you help me to review this PR?
   gently ping @cloud-fan 
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1267617871


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,96 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2", "skewData3", "skewData4") {

Review Comment:
   Can we simplify and clear the test a bit? We can make tests using one join that one side is a skewed table and other side is not. Then we can switch it's side to verify what happens if the skewed side is on left or right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1272014163


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,69 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewed", "nonSkewed") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewed")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("nonSkewed")
+
+        def checkSkewJoin(

Review Comment:
   can we inline this function ? it seems only be used once.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,69 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewed", "nonSkewed") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewed")
+        // skewData2
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            'id as "key2",
+            'id as "value2")
+          .createOrReplaceTempView("nonSkewed")
+
+        def checkSkewJoin(
+                           joins: Seq[SortMergeJoinExec],
+                           leftSkewNum: Int,
+                           rightSkewNum: Int): Unit = {
+          assert(joins.size == 1 && joins.last.isSkewJoin)
+          assert(joins.last.left.collect {
+            case r: AQEShuffleReadExec => r
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)

Review Comment:
   ```scala
   assert(joins.last.left.exists {
     case AQEShuffleReadExec(_, p) if p.exists(_.isInstanceOf[PartialReducerPartitionSpec]) => true
   })
   ```
   
   We do not really need to check the number of skewed partition spec.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,69 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewed", "nonSkewed") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
+          .createOrReplaceTempView("skewed")
+        // skewData2

Review Comment:
   please remove it



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,69 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewed", "nonSkewed") {
+        // skewData1
+        spark
+          .range(0, 1000, 1, 10)
+          .select(
+            when('id < 250, 249)

Review Comment:
   please do not user `'` to represent the symbol which is deprecated. You can use `$"id"` instead



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,69 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewed", "nonSkewed") {
+        // skewData1

Review Comment:
   please remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] caican00 commented on a diff in pull request #42003: [SPARK-44426][SQL] Optimize adaptive skew join for ExistenceJoin

Posted by "caican00 (via GitHub)" <gi...@apache.org>.
caican00 commented on code in PR #42003:
URL: https://github.com/apache/spark/pull/42003#discussion_r1271294044


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -874,6 +875,96 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-44426: Optimize adaptive skew join for ExistenceJoin") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+      withTempView("skewData1", "skewData2", "skewData3", "skewData4") {

Review Comment:
   > Can we simplify and clear the test a bit? We can make tests using one join that one side is a skewed table and other side is not. Then we can switch it's side to verify what happens if the skewed side is on left or right.
   
   @ulysses-you Have been modified. Thanks for reviewing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org