You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/09/02 14:13:20 UTC
[jira] [Commented] (SPARK-17377) Joining Datasets read and
aggregated from a partitioned Parquet file gives wrong results
[ https://issues.apache.org/jira/browse/SPARK-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458642#comment-15458642 ]
Sean Owen commented on SPARK-17377:
-----------------------------------
Probably related: https://issues.apache.org/jira/browse/SPARK-11757
> Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results
> ----------------------------------------------------------------------------------------
>
> Key: SPARK-17377
> URL: https://issues.apache.org/jira/browse/SPARK-17377
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.0
> Reporter: Hanna Mäki
>
> Reproduction:
> 1) Read two Datasets from a partitioned Parquet file with different filter conditions on the partitioning column
> 2) Group by a column and aggregate the two data sets
> 3) Join the aggregated Datasets on the group by column
> 4) In the joined dataset, the aggregated values from the right Dataset have been replaced with the aggregated values from the left Dataset
> The issue is only reproduced when the input parquet file is partitioned.
> Example:
> val dataPath= "/your/data/path/"
> case class InputData(id: Int, value: Int, filterColumn: Int)
> val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 2), InputData(4, 40, 2)).toDS()
> inputDS.show
> | id|value|filterColumn|
> | 1| 1| 1|
> | 2| 2| 1|
> | 3| 3| 1|
> | 4| 4| 1|
> | 1| 10| 2|
> | 2| 20| 2|
> | 3| 30| 2|
> | 4| 40| 2|
> inputDS.write.partitionBy("filterColumn").parquet(dataPath)
> val dataDF = spark.read.parquet(dataPath)
> case class LeftClass(id: Int, aggLeft: Long)
> case class RightClass(id: Int, aggRight: Long)
> val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") as "aggLeft").as[LeftClass]
> val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]
> leftDS.show
> | id|aggLeft|
> | 1| 1|
> | 3| 3|
> | 4| 4|
> | 2| 2|
> rightDS.show
> | id|aggRight|
> | 1| 10|
> | 3| 30|
> | 4| 40|
> | 2| 20|
> val joinedDS = leftDS.join(rightDS,"id")
> joinedDS.show
> | id|aggLeft|aggRight|
> | 1| 1| 1|
> | 3| 3| 3|
> | 4| 4| 4|
> | 2| 2| 2|
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org