You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wankunde (via GitHub)" <gi...@apache.org> on 2023/12/02 06:08:21 UTC

[PR] [SPARK-40609][SQL] Unwrap cast in the join condition to unlock bucketed read [spark]

wankunde opened a new pull request, #44120:
URL: https://github.com/apache/spark/pull/44120

   ### What changes were proposed in this pull request?
   It will invalidate the bucketed read if add a cast on bucket keys:
   
   ```sql
   set spark.sql.autoBroadcastJoinThreshold=-1;
   CREATE TABLE t2 USING parquet CLUSTERED BY (i) INTO 8 buckets AS
   SELECT CAST(v AS bigint) AS i FROM values(1), (9223372036854775807) AS data(v);
   
   CREATE TABLE t3 USING parquet CLUSTERED BY (i) INTO 4 buckets AS
   SELECT CAST(v AS decimal(18, 0)) AS i FROM values(1), (999999999999999999) AS data(v);
   
   EXPLAIN SELECT * FROM t2 JOIN t3 ON t2.i = t3.i;
   ```
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- SortMergeJoin [cast(i#6L as decimal(20,0))], [cast(i#19 as decimal(20,0))], Inner
      :- Sort [cast(i#6L as decimal(20,0)) ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(cast(i#6L as decimal(20,0)), 5), ENSURE_REQUIREMENTS, [plan_id=128]
      :     +- Filter isnotnull(i#6L)
      :        +- FileScan parquet spark_catalog.default.t2[i#6L] Batched: true, Bucketed: false (disabled by query planner)
      +- Sort [cast(i#19 as decimal(20,0)) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(cast(i#19 as decimal(20,0)), 5), ENSURE_REQUIREMENTS, [plan_id=132]
            +- Filter isnotnull(i#19)
               +- FileScan parquet spark_catalog.default.t3[i#19] Batched: true, Bucketed: false (disabled by query planner)
   ```
   
   This PR adds a new rule(`UnwrapCastInJoinCondition`) before `EnsureRequirements` to unwrap cast in join condition to unlock bucketed read if they are integral types. The key idea here is that casting to either of these two types will not affect the result of join for integral types join keys. For example: `a.intCol = try_cast(b.bigIntCol AS int)`, if the value of `bigIntCol` exceeds the range of int, the result of `try_cast(b.bigIntCol AS int)` is `null`, and the result of `a.intCol = try_cast(b.bigIntCol AS int)` in the join condition is `false`. The result is consistent with `cast(a.intCol AS bigint) = b.bigIntCol`.
   
   After This PR:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- SortMergeJoin [i#6L], [try_cast(i#29 as bigint)], Inner
      :- Sort [i#6L ASC NULLS FIRST], false, 0
      :  +- Filter isnotnull(i#6L)
      :     +- FileScan parquet spark_catalog.default.t2[i#6L] Batched: true, Bucketed: true, SelectedBucketsCount: 8 out of 8
      +- Sort [try_cast(i#29 as bigint) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(try_cast(i#29 as bigint), 8), ENSURE_REQUIREMENTS, [plan_id=132]
            +- Filter isnotnull(i#29)
               +- FileScan parquet spark_catalog.default.t3[i#29] Batched: true, Bucketed: false (disabled by query planner)
   ```
   
   ### Why are the changes needed?
   Reduce shuffle to improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Unit test.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40609][SQL] Unwrap cast in the join condition to unlock bucketed read [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #44120:
URL: https://github.com/apache/spark/pull/44120#issuecomment-1837105605

   Retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-40609][SQL] Unwrap cast in the join condition to unlock bucketed read [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde closed pull request #44120: [SPARK-40609][SQL] Unwrap cast in the join condition to unlock bucketed read
URL: https://github.com/apache/spark/pull/44120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org