You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hanna Mäki (JIRA)" <ji...@apache.org> on 2016/09/02 10:06:20 UTC

[jira] [Updated] (SPARK-17377) Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results

     [ https://issues.apache.org/jira/browse/SPARK-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hanna Mäki updated SPARK-17377:
-------------------------------
    Description: 
Reproduction: 

1) Read two Datasets from a partitioned Parquet file with different filter conditions on the partitioning column
2) Group by a column and aggregate the two data sets
3) Join the aggregated Datasets on the group by column
4) In the joined dataset, the aggregated values from the right Dataset have been replaced with the aggregated values from the left Dataset 

The issue is only reproduced when the input parquet file is partitioned.

Example: 

val dataPath= "/your/data/path/" 

case class InputData(id: Int, value: Int, filterColumn: Int)

val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 2), InputData(4, 40, 2)).toDS()

inputDS.show
| id|value|filterColumn|
|  1|    1|           1|
|  2|    2|           1|
|  3|    3|           1|
|  4|    4|           1|
|  1|   10|           2|
|  2|   20|           2|
|  3|   30|           2|
|  4|   40|           2|

inputDS.write.partitionBy("filterColumn").parquet(dataPath)

val dataDF = spark.read.parquet(dataPath)

case class LeftClass(id: Int, aggLeft: Long)

case class RightClass(id: Int, aggRight: Long)

val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") as "aggLeft").as[LeftClass]

val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]

leftDS.show
| id|aggLeft|
|  1|      1|
|  3|      3|
|  4|      4|
|  2|      2|

rightDS.show
| id|aggRight|
|  1|      10|
|  3|      30|
|  4|      40|
|  2|      20|

val joinedDS = leftDS.join(rightDS,"id")
joinedDS.show
| id|aggLeft|aggRight|
|  1|      1|       1|
|  3|      3|       3|
|  4|      4|       4|
|  2|      2|       2|

  was:
Reproduction: 

1) Read two Datasets from a partitioned Parquet file with different filter conditions on the partitioning column
2) Group by a column and aggregate the two data sets
3) Join the aggregated Datasets on the group by column
4) In the joined dataset, the aggregated values from the right Dataset have been replaced with the aggregated values from the left Dataset 

The issue is only reproduced when the input parquet file is partitioned.

Example: 

val dataPath= "/your/data/path/" 

case class InputData(id: Int, value: Int, filterColumn: Int)

val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 2), InputData(4, 40, 2)).toDS()

inputDS.show
+---+-----+------------+
| id|value|filterColumn|
+---+-----+------------+
|  1|    1|           1|
|  2|    2|           1|
|  3|    3|           1|
|  4|    4|           1|
|  1|   10|           2|
|  2|   20|           2|
|  3|   30|           2|
|  4|   40|           2|
+---+-----+------------+

inputDS.write.partitionBy("filterColumn").parquet(dataPath)

val dataDF = spark.read.parquet(dataPath)

case class LeftClass(id: Int, aggLeft: Long)

case class RightClass(id: Int, aggRight: Long)

val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") as "aggLeft").as[LeftClass]

val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]

leftDS.show
+---+-------+
| id|aggLeft|
+---+-------+
|  1|      1|
|  3|      3|
|  4|      4|
|  2|      2|
+---+-------+

rightDS.show
+---+--------+
| id|aggRight|
+---+--------+
|  1|      10|
|  3|      30|
|  4|      40|
|  2|      20|
+---+--------+


val joinedDS = leftDS.join(rightDS,"id")
joinedDS.show
+---+-------+--------+
| id|aggLeft|aggRight|
+---+-------+--------+
|  1|      1|       1|
|  3|      3|       3|
|  4|      4|       4|
|  2|      2|       2|
+---+-------+--------+


> Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-17377
>                 URL: https://issues.apache.org/jira/browse/SPARK-17377
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Hanna Mäki
>
> Reproduction: 
> 1) Read two Datasets from a partitioned Parquet file with different filter conditions on the partitioning column
> 2) Group by a column and aggregate the two data sets
> 3) Join the aggregated Datasets on the group by column
> 4) In the joined dataset, the aggregated values from the right Dataset have been replaced with the aggregated values from the left Dataset 
> The issue is only reproduced when the input parquet file is partitioned.
> Example: 
> val dataPath= "/your/data/path/" 
> case class InputData(id: Int, value: Int, filterColumn: Int)
> val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 2), InputData(4, 40, 2)).toDS()
> inputDS.show
> | id|value|filterColumn|
> |  1|    1|           1|
> |  2|    2|           1|
> |  3|    3|           1|
> |  4|    4|           1|
> |  1|   10|           2|
> |  2|   20|           2|
> |  3|   30|           2|
> |  4|   40|           2|
> inputDS.write.partitionBy("filterColumn").parquet(dataPath)
> val dataDF = spark.read.parquet(dataPath)
> case class LeftClass(id: Int, aggLeft: Long)
> case class RightClass(id: Int, aggRight: Long)
> val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") as "aggLeft").as[LeftClass]
> val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]
> leftDS.show
> | id|aggLeft|
> |  1|      1|
> |  3|      3|
> |  4|      4|
> |  2|      2|
> rightDS.show
> | id|aggRight|
> |  1|      10|
> |  3|      30|
> |  4|      40|
> |  2|      20|
> val joinedDS = leftDS.join(rightDS,"id")
> joinedDS.show
> | id|aggLeft|aggRight|
> |  1|      1|       1|
> |  3|      3|       3|
> |  4|      4|       4|
> |  2|      2|       2|



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org