You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rob Russo (Jira)" <ji...@apache.org> on 2024/01/18 23:05:00 UTC

[jira] [Commented] (SPARK-45282) Join loses records for cached datasets

    [ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808393#comment-17808393 ] 

Rob Russo commented on SPARK-45282:
-----------------------------------

Is it possible that this also affects spark 3.3.2? I have an application that has been running on spark 3.3.2 and with AQE enabled. When I upgraded to 3.5.0 I immediately ran into the issue in this ticket. However when I started looking more closely I found that for 1 particular type of report the issue was still present even after rolling back to 3.3.2 with AQE enabled.

Either way on 3.3.2 or 3.5.0, disabling AQE fixed the problem.

> Join loses records for cached datasets
> --------------------------------------
>
>                 Key: SPARK-45282
>                 URL: https://issues.apache.org/jira/browse/SPARK-45282
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1, 3.5.0
>         Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or databricks 13.3
>            Reporter: koert kuipers
>            Assignee: Emil Ejbyfeldt
>            Priority: Blocker
>              Labels: CorrectnessBug, correctness, pull-request-available
>             Fix For: 3.4.2
>
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. however i did get it to show up on hadoop cluster, kubernetes, and on databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an optimization while in spark 3.3.1 these Exhanges are still present. it seems to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 1000000).toDS().map(i => UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
>   left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
>   .toDF("key", "value1")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of left1 " + left1.count())
> val right1 = right
>   .toDF("key", "value2")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " +  left1.join(right1, "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 1000000
> number of right 1000000
> number of (left join right) 1000000
> number of left1 1000000
> number of right1 1000000
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on settings and cluster size etc.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org