You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2020/07/02 00:21:00 UTC
[jira] [Commented] (SPARK-32148) LEFT JOIN generating
non-deterministic and unexpected result (regression in Spark 3.0)
[ https://issues.apache.org/jira/browse/SPARK-32148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17149754#comment-17149754 ]
Jungtaek Lim commented on SPARK-32148:
--------------------------------------
Looking into it. Looks like a problem indeed (if then this should be a blocker) but want to make it sure.
> LEFT JOIN generating non-deterministic and unexpected result (regression in Spark 3.0)
> --------------------------------------------------------------------------------------
>
> Key: SPARK-32148
> URL: https://issues.apache.org/jira/browse/SPARK-32148
> Project: Spark
> Issue Type: Bug
> Components: SQL, Structured Streaming
> Affects Versions: 3.0.0
> Reporter: Michael
> Priority: Major
>
> When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT JOINs now output unexpected results.
> Below is a minimal example to run in {{spark-shell}} to demonstrate this. In it there are 3 events on the left side of the join and two on the right.
> The expected output should contain two matching pairs and one item on the left side without a matching right side, so that it should be output with the right side fields being {{NULL}}. The join condition is that event times must be max. 30sec apart and the IDs must match.
> {code:scala}
> import spark.implicits._
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.functions.expr
> import org.apache.spark.sql.streaming.OutputMode
> import java.sql.Timestamp
> import java.util.UUID
> // Structure of left and right data items
> case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
> case class RightEntry(eventTime: Timestamp, id: String, name: String)
> // Some test data
> val leftData = Vector(
> LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"),
> LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
> LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
> )
> val rightData = Vector(
> RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
> RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
> )
> // Write test data, that we will stream from later (random output directories; alternatively we could delete the directories after each run)
> val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
> spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
> val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
> spark.createDataset(rightData).write.format("parquet").save(rightFilePath)
> // Read data from Parquet as stream
> val leftStream = spark.readStream
> .schema(Encoders.product[LeftEntry].schema)
> .parquet(leftFilePath)
> .withWatermark("eventTime", "2 minutes")
> val rightStream = spark.readStream
> .schema(Encoders.product[RightEntry].schema)
> .parquet(rightFilePath)
> .withWatermark("eventTime", "4 minutes")
> // Define Join
> val joinExpression = expr(
> s"""
> |leftStream.id = rightStream.id AND
> |leftStream.eventTime BETWEEN
> | rightStream.eventTime - INTERVAL 30 seconds AND
> | rightStream.eventTime + INTERVAL 30 seconds
> """.stripMargin
> )
> val joinedData = leftStream.as("leftStream")
> .join(
> rightStream.as("rightStream"),
> joinExpression,
> "left"
> )
> // Run query
> val query = joinedData.writeStream
> .format("memory")
> .queryName("myQuery")
> .outputMode(OutputMode.Append())
> .start()
> query.processAllAvailable()
> // Print results
> spark
> .table(query.name)
> .show(truncate = false)
> {code}
> When this is executed with Spark 2.4.6, the result is as expected and deterministic:
> {noformat}
> +-------------------+---+-------------------+-------------------+----+----+
> |eventTime |id |comment |eventTime |id |name|
> +-------------------+---+-------------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A |2020-01-02 00:00:10|abc |A |
> |2020-01-02 01:00:00|abc|joined with B |2020-01-02 00:59:59|abc |B |
> |2020-01-01 00:00:00|abc|has no join partner|null |null|null| ← as expected
> +-------------------+---+-------------------+-------------------+----+----+
> {noformat}
> When running the same code snippet with Spark 3.0.0, the result is non-deterministically one of these two:
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime |id |comment |eventTime |id |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B |
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A |
> |2020-01-02 00:00:00|abc|joined with A|null |null|null| ← this entry was already joined with "A" above,
> +-------------------+---+-------------+-------------------+----+----+ but is now here once more without it's right join side
> {noformat}
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime |id |comment |eventTime |id |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A |
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B |
> |2020-01-02 01:00:00|abc|joined with B|null |null|null| ← this entry was already joined with "B" above,
> +-------------------+---+-------------+-------------------+----+----+ but is now here once more without it's right join side
> {noformat}
> ... with {{"has no join partner"}} completely missing, and instead one of the actually joinable left-side items repeated without the right-side fields.
> ----
> In case the input data is modified, so that the non-joinable event additionally has a different ID, then Spark 3.0 generates correct output:
> {code:scala}
> // [...]
> val leftData = Vector(
> LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "ddd", "has no join partner"),
> // ↑↑↑ changed
> LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
> LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
> )
> // [...]
> {code}
> {noformat}
> +-------------------+---+-------------------+-------------------+----+----+
> |eventTime |id |comment |eventTime |id |name|
> +-------------------+---+-------------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A |2020-01-02 00:00:10|abc |A |
> |2020-01-02 01:00:00|abc|joined with B |2020-01-02 00:59:59|abc |B |
> |2020-01-01 00:00:00|ddd|has no join partner|null |null|null|
> +-------------------+---+-------------------+-------------------+----+----+
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org