You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hanna Mäki (JIRA)" <ji...@apache.org> on 2016/09/02 10:00:28 UTC

[jira] [Created] (SPARK-17377) Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results

Hanna Mäki created SPARK-17377:
----------------------------------

             Summary: Joining Datasets read and aggregated from a partitioned Parquet file gives wrong results
                 Key: SPARK-17377
                 URL: https://issues.apache.org/jira/browse/SPARK-17377
             Project: Spark
          Issue Type: Bug
    Affects Versions: 2.0.0
            Reporter: Hanna Mäki


Reproduction: 

1) Read two Datasets from a partitioned Parquet file with different filter conditions on the partitioning column
2) Group by a column and aggregate the two data sets
3) Join the aggregated Datasets on the group by column
4) In the joined dataset, the aggregated values from the right Dataset have been replaced with the aggregated values from the left Dataset 

The issue is only reproduced when the input parquet file is partitioned.

Example: 

val dataPath= "/your/data/path/" 

case class InputData(id: Int, value: Int, filterColumn: Int)

val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 2), InputData(4, 40, 2)).toDS()

inputDS.show
+---+-----+------------+
| id|value|filterColumn|
+---+-----+------------+
|  1|    1|           1|
|  2|    2|           1|
|  3|    3|           1|
|  4|    4|           1|
|  1|   10|           2|
|  2|   20|           2|
|  3|   30|           2|
|  4|   40|           2|
+---+-----+------------+

inputDS.write.partitionBy("filterColumn").parquet(dataPath)

val dataDF = spark.read.parquet(dataPath)

case class LeftClass(id: Int, aggLeft: Long)

case class RightClass(id: Int, aggRight: Long)

val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") as "aggLeft").as[LeftClass]

val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]

leftDS.show
+---+-------+
| id|aggLeft|
+---+-------+
|  1|      1|
|  3|      3|
|  4|      4|
|  2|      2|
+---+-------+

rightDS.show
+---+--------+
| id|aggRight|
+---+--------+
|  1|      10|
|  3|      30|
|  4|      40|
|  2|      20|
+---+--------+


val joinedDS = leftDS.join(rightDS,"id")
joinedDS.show
+---+-------+--------+
| id|aggLeft|aggRight|
+---+-------+--------+
|  1|      1|       1|
|  3|      3|       3|
|  4|      4|       4|
|  2|      2|       2|
+---+-------+--------+



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org