You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sören Reichardt (JIRA)" <ji...@apache.org> on 2019/01/08 13:32:00 UTC

[jira] [Updated] (SPARK-26572) Join on distinct column with monotonically_increasing_id produces wrong output

     [ https://issues.apache.org/jira/browse/SPARK-26572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sören Reichardt updated SPARK-26572:
------------------------------------
    Summary: Join on distinct column with monotonically_increasing_id produces wrong output  (was: Join on distinct column with monotonically_increasing_id produced wrong output)

> Join on distinct column with monotonically_increasing_id produces wrong output
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-26572
>                 URL: https://issues.apache.org/jira/browse/SPARK-26572
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.2, 2.4.0
>         Environment: Running on Ubuntu 18.04LTS and Intellij 2018.2.5
>            Reporter: Sören Reichardt
>            Priority: Major
>
> When joining a table with projected monotonically_increasing_id column after calling distinct with another table the operators do not get executed in the right order. 
> Here is a minimal example:
> {code:java}
> import org.apache.spark.sql.{DataFrame, SparkSession, functions}
> object JoinBug extends App {
>   // Spark session setup
>   val session =  SparkSession.builder().master("local[*]").getOrCreate()
>   import session.sqlContext.implicits._
>   session.sparkContext.setLogLevel("error")
>   // Bug in Spark: "monotonically_increasing_id" is pushed down when it shouldn't be. Push down only happens when the
>   // DF containing the "monotonically_increasing_id" expression is on the left side of the join.
>   val baseTable = Seq((1), (1)).toDF("idx")
>   val distinctWithId = baseTable.distinct.withColumn("id", functions.monotonically_increasing_id())
>   val monotonicallyOnRight: DataFrame = baseTable.join(distinctWithId, "idx")
>   val monotonicallyOnLeft: DataFrame = distinctWithId.join(baseTable, "idx")
>   monotonicallyOnLeft.show // Wrong
>   monotonicallyOnRight.show // Ok in Spark 2.2.2 - also wrong in Spark 2.4.0
> }
> {code}
> It produces the following output:
> {code:java}
> Wrong:
> +---+------------+
> |idx| id         |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187457 |
> +---+------------+
> Right:
> +---+------------+
> |idx| id         |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187456 |
> +---+------------+
> {code}
> We assume that the join operator triggers a pushdown of expressions (monotonically_increasing_id in this case) which gets pushed down to be executed before distinct. This produces non-distinct rows with unique id's. However it seems like this behavior only appears if the table with the projected expression is on the left side of the join.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org