You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2020/07/02 00:21:00 UTC

[jira] [Commented] (SPARK-32148) LEFT JOIN generating non-deterministic and unexpected result (regression in Spark 3.0)

    [ https://issues.apache.org/jira/browse/SPARK-32148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17149754#comment-17149754 ] 

Jungtaek Lim commented on SPARK-32148:
--------------------------------------

Looking into it. Looks like a problem indeed (if then this should be a blocker) but want to make it sure.

> LEFT JOIN generating non-deterministic and unexpected result (regression in Spark 3.0)
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-32148
>                 URL: https://issues.apache.org/jira/browse/SPARK-32148
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Michael
>            Priority: Major
>
> When upgrading from Spark 2.4.6 to 3.0.0 I found that previously working LEFT JOINs now output unexpected results.
> Below is a minimal example to run in {{spark-shell}} to demonstrate this. In it there are 3 events on the left side of the join and two on the right.
>  The expected output should contain two matching pairs and one item on the left side without a matching right side, so that it should be output with the right side fields being {{NULL}}. The join condition is that event times must be max. 30sec apart and the IDs must match.
> {code:scala}
> import spark.implicits._
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.functions.expr
> import org.apache.spark.sql.streaming.OutputMode
> import java.sql.Timestamp
> import java.util.UUID
> // Structure of left and right data items
> case class LeftEntry(eventTime: Timestamp, id: String, comment: String)
> case class RightEntry(eventTime: Timestamp, id: String, name: String)
> // Some test data
> val leftData = Vector(
>   LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
> )
> val rightData = Vector(
>   RightEntry(Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
>   RightEntry(Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B")
> )
> // Write test data, that we will stream from later (random output directories; alternatively we could delete the directories after each run)
> val leftFilePath = s"/tmp/demo-left-data-${UUID.randomUUID()}"
> spark.createDataset(leftData).write.format("parquet").save(leftFilePath)
> val rightFilePath = s"/tmp/demo-right-data-${UUID.randomUUID()}"
> spark.createDataset(rightData).write.format("parquet").save(rightFilePath)
> // Read data from Parquet as stream
> val leftStream = spark.readStream
>   .schema(Encoders.product[LeftEntry].schema)
>   .parquet(leftFilePath)
>   .withWatermark("eventTime", "2 minutes")
> val rightStream = spark.readStream
>   .schema(Encoders.product[RightEntry].schema)
>   .parquet(rightFilePath)
>   .withWatermark("eventTime", "4 minutes")
> // Define Join
> val joinExpression = expr(
>   s"""
>      |leftStream.id = rightStream.id AND
>      |leftStream.eventTime BETWEEN
>      |  rightStream.eventTime - INTERVAL 30 seconds AND
>      |  rightStream.eventTime + INTERVAL 30 seconds
>     """.stripMargin
> )
> val joinedData = leftStream.as("leftStream")
>   .join(
>     rightStream.as("rightStream"),
>     joinExpression,
>     "left"
>   )
> // Run query
> val query = joinedData.writeStream
>   .format("memory")
>   .queryName("myQuery")
>   .outputMode(OutputMode.Append())
>   .start()
> query.processAllAvailable()
> // Print results
> spark
>   .table(query.name)
>   .show(truncate = false)
> {code}
> When this is executed with Spark 2.4.6, the result is as expected and deterministic:
> {noformat}
> +-------------------+---+-------------------+-------------------+----+----+
> |eventTime          |id |comment            |eventTime          |id  |name|
> +-------------------+---+-------------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
> |2020-01-01 00:00:00|abc|has no join partner|null               |null|null|  ← as expected
> +-------------------+---+-------------------+-------------------+----+----+
> {noformat}
> When running the same code snippet with Spark 3.0.0, the result is non-deterministically one of these two:
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime          |id |comment      |eventTime          |id  |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
> |2020-01-02 00:00:00|abc|joined with A|null               |null|null|  ← this entry was already joined with "A" above,
> +-------------------+---+-------------+-------------------+----+----+    but is now here once more without it's right join side
> {noformat}
> {noformat}
> +-------------------+---+-------------+-------------------+----+----+
> |eventTime          |id |comment      |eventTime          |id  |name|
> +-------------------+---+-------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A|2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B|2020-01-02 00:59:59|abc |B   |
> |2020-01-02 01:00:00|abc|joined with B|null               |null|null|  ← this entry was already joined with "B" above,
> +-------------------+---+-------------+-------------------+----+----+    but is now here once more without it's right join side
> {noformat}
> ... with {{"has no join partner"}} completely missing, and instead one of the actually joinable left-side items repeated without the right-side fields.
> ----
> In case the input data is modified, so that the non-joinable event additionally has a different ID, then Spark 3.0 generates correct output:
> {code:scala}
> // [...]
> val leftData = Vector(
>   LeftEntry(Timestamp.valueOf("2020-01-01 00:00:00"), "ddd", "has no join partner"),
>                                                     // ↑↑↑ changed
>   LeftEntry(Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
>   LeftEntry(Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B")
> )
> // [...]
> {code}
> {noformat}
> +-------------------+---+-------------------+-------------------+----+----+
> |eventTime          |id |comment            |eventTime          |id  |name|
> +-------------------+---+-------------------+-------------------+----+----+
> |2020-01-02 00:00:00|abc|joined with A      |2020-01-02 00:00:10|abc |A   |
> |2020-01-02 01:00:00|abc|joined with B      |2020-01-02 00:59:59|abc |B   |
> |2020-01-01 00:00:00|ddd|has no join partner|null               |null|null|
> +-------------------+---+-------------------+-------------------+----+----+
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org