You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2016/08/18 07:32:21 UTC

[jira] [Assigned] (SPARK-17061) Incorrect results returned following a join of two datasets and a map step where total number of columns >100

     [ https://issues.apache.org/jira/browse/SPARK-17061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-17061:
------------------------------------

    Assignee: Apache Spark

> Incorrect results returned following a join of two datasets and a map step where total number of columns >100
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-17061
>                 URL: https://issues.apache.org/jira/browse/SPARK-17061
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Jamie Hutton
>            Assignee: Apache Spark
>            Priority: Critical
>
> We have hit a consistent bug where we have a dataset with more than 100 columns. I am raising as a blocker because spark is returning the WRONG results rather than erroring, leading to data integrity issues
> I have put together the following test case which will show the issue (it will run in spark-shell). In this example i am joining a dataset with lots of fields onto another dataset. 
> The join works fine and if you show the dataset you will get the expected result. However if you run a map step over the dataset you end up with a strange error where the sequence that is in the right dataset now only contains the last value.
> Whilst this test may seem a rather contrived example, what we are doing here is a very standard analtical pattern. My original code was designed to:
>  - take a dataset of child records
>  - groupByKey up to the parent: giving a Dataset of (ParentID, Seq[Children])
>  - join the children onto the parent by parentID: giving ((Parent),(ParentID,Seq[Children])
>  - map over the result to give a tuple of (Parent,Seq[Children])
> Notes:
> - The issue is resolved by having less fields - as soon as we go <= 100 the integrity issue goes away. Try removing one of the fields from BigCaseClass below
> - The issue will arise based on the total number of fields in the resulting dataset. Below i have a small case class and a big case class, but two case classes of 50 variable would give the same issue
> - the issue occurs where the case class being joined on (on the right) has a case class type. It doesnt occur if you have a Seq[String]
> - If i go back to an RDD for the map step after the join i can workaround the issue, but i lose all the benefits of datasets
> Scala code test case:
>   case class Name(name: String)
>   case class SmallCaseClass (joinkey: Integer, names: Seq[Name])    
>   case class BigCaseClass  (field1: Integer,field2: Integer,field3: Integer,field4: Integer,field5: Integer,field6: Integer,field7: Integer,field8: Integer,field9: Integer,field10: Integer,field11: Integer,field12: Integer,field13: Integer,field14: Integer,field15: Integer,field16: Integer,field17: Integer,field18: Integer,field19: Integer,field20: Integer,field21: Integer,field22: Integer,field23: Integer,field24: Integer,field25: Integer,field26: Integer,field27: Integer,field28: Integer,field29: Integer,field30: Integer,field31: Integer,field32: Integer,field33: Integer,field34: Integer,field35: Integer,field36: Integer,field37: Integer,field38: Integer,field39: Integer,field40: Integer,field41: Integer,field42: Integer,field43: Integer,field44: Integer,field45: Integer,field46: Integer,field47: Integer,field48: Integer,field49: Integer,field50: Integer,field51: Integer,field52: Integer,field53: Integer,field54: Integer,field55: Integer,field56: Integer,field57: Integer,field58: Integer,field59: Integer,field60: Integer,field61: Integer,field62: Integer,field63: Integer,field64: Integer,field65: Integer,field66: Integer,field67: Integer,field68: Integer,field69: Integer,field70: Integer,field71: Integer,field72: Integer,field73: Integer,field74: Integer,field75: Integer,field76: Integer,field77: Integer,field78: Integer,field79: Integer,field80: Integer,field81: Integer,field82: Integer,field83: Integer,field84: Integer,field85: Integer,field86: Integer,field87: Integer,field88: Integer,field89: Integer,field90: Integer,field91: Integer,field92: Integer,field93: Integer,field94: Integer,field95: Integer,field96: Integer,field97: Integer,field98: Integer,field99: Integer)
>       
>     val bigCC=Seq(BigCaseClass(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99))
>     
>     val smallCC=Seq(SmallCaseClass(1,Seq(
>         Name("Jamie"), 
>         Name("Ian"),
>         Name("Dave"),
>         Name("Will")
>         )))
>     
>     
>     val bigCCDS = spark.createDataset(spark.sparkContext.parallelize(bigCC))
>     val smallCCDS = spark.createDataset(spark.sparkContext.parallelize(smallCC))
>     
>     val joined_test=bigCCDS.as("A").joinWith(smallCCDS.as("B"),  $"A.field1"===$"B.joinkey", "LEFT")
>     
>     /*This next step is fine - it shows all 4 names:
>      * [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
>      * [1,WrappedArray([Jamie], [Ian], [Dave], [Will])]
>      * */
>     joined_test.show(false)
>     
>     /*This one ends up repeating will - I did the most simple map step possible here
>      * [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
>      * [1,WrappedArray([Will], [Will], [Will], [Will])]
>      * */
>     joined_test.map(identity).show(false)
>     
>     /*This one works because we have less than 100 fields:
>      * [Jamie], [Ian], [Dave], [Will]*/
>     joined_test.map(_._2).show(false)
>     



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org