You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:35:36 UTC

[jira] [Resolved] (SPARK-17377) Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results

     [ https://issues.apache.org/jira/browse/SPARK-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-17377.
----------------------------------
    Resolution: Incomplete

> Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-17377
>                 URL: https://issues.apache.org/jira/browse/SPARK-17377
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Hanna Mäki
>            Assignee: Davies Liu
>            Priority: Major
>              Labels: bulk-closed
>
> Reproduction: 
> 1) Read two Datasets from a partitioned Parquet file with different filter conditions on the partitioning column
> 2) Group by a column and aggregate the two data sets
> 3) Join the aggregated Datasets on the group by column
> 4) In the joined dataset, the aggregated values from the right Dataset have been replaced with the aggregated values from the left Dataset 
> The issue is only reproduced when the input parquet file is partitioned.
> Example: 
> {code}
> val dataPath= "/your/data/path/" 
> case class InputData(id: Int, value: Int, filterColumn: Int)
> val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 2), InputData(4, 40, 2)).toDS()
> inputDS.show
> | id|value|filterColumn|
> |  1|    1|           1|
> |  2|    2|           1|
> |  3|    3|           1|
> |  4|    4|           1|
> |  1|   10|           2|
> |  2|   20|           2|
> |  3|   30|           2|
> |  4|   40|           2|
> inputDS.write.partitionBy("filterColumn").parquet(dataPath)
> val dataDF = spark.read.parquet(dataPath)
> case class LeftClass(id: Int, aggLeft: Long)
> case class RightClass(id: Int, aggRight: Long)
> val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") as "aggLeft").as[LeftClass]
> val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]
> leftDS.show
> | id|aggLeft|
> |  1|      1|
> |  3|      3|
> |  4|      4|
> |  2|      2|
> rightDS.show
> | id|aggRight|
> |  1|      10|
> |  3|      30|
> |  4|      40|
> |  2|      20|
> val joinedDS = leftDS.join(rightDS,"id")
> joinedDS.show
> | id|aggLeft|aggRight|
> |  1|      1|       1|
> |  3|      3|       3|
> |  4|      4|       4|
> |  2|      2|       2|
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org