You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicholas Brett Marcott (Jira)" <ji...@apache.org> on 2020/12/21 05:28:00 UTC

[jira] [Commented] (SPARK-28125) dataframes created by randomSplit have overlapping rows

    [ https://issues.apache.org/jira/browse/SPARK-28125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17252624#comment-17252624 ] 

Nicholas Brett Marcott commented on SPARK-28125:
------------------------------------------------

This seems expected, albeit confusing behavior given the underlying dataset in your test is non-deterministic. It is being randomly generated.

As you mentioned caching the underlying dataset is a workaround, and is currently automatically used [here|https://github.com/apache/spark/pull/17751/files] in the code to address situations when no column is orderable.

[~hyukjin.kwon] I see you most recently updated this issue. Has there been any discussion around possible approaches here?
 # Always cache the underlying dataset
 # Only cache if dataset is non-deterministic. It seems like there isn't a way to know this in general, like in this case since it uses a UDF.
 # More documentation to warn users

[~zacharydestefano] could you provide a more realistic example, like the ML related example you mentioned? 

> dataframes created by randomSplit have overlapping rows
> -------------------------------------------------------
>
>                 Key: SPARK-28125
>                 URL: https://issues.apache.org/jira/browse/SPARK-28125
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, MLlib, PySpark, Spark Core
>    Affects Versions: 2.4.3
>         Environment: Run with Databricks Runtime 5.3 ML (includes Apache Spark 2.4.0, Scala 2.11)
>  
> More details on the environment: [https://docs.databricks.com/release-notes/runtime/5.3ml.html]
> The python package versions: [https://docs.databricks.com/release-notes/runtime/5.3ml.html#python-libraries]
>            Reporter: Zachary
>            Priority: Major
>
> It appears that the function randomSplit on a DataFrame creates a separate execution plan for each of the result DataFrames, or at least that's the impression I get from reading a few StackOverflow pages on it: 
> [https://stackoverflow.com/questions/38379522/how-does-spark-keep-track-of-the-splits-in-randomsplit/38380023#38380023]
> [https://stackoverflow.com/questions/32933143/how-does-sparks-rdd-randomsplit-actually-split-the-rdd/32933366]
>  
> Because of the separate executions, it is easy to create a situation where the Dataframes returned by randomSplit have overlapping rows. Thus if people are relying on it to split a dataset into training and test, then they could easily end up with the same rows in both sets, thus causing a serious problem when running model evaluation. 
>  
> I know that if you call .cache() on the RDD before calling .randomSplit then you can be assured that the returned frames have unique rows, but this work-around is definitely not obvious. I did not know about this issue and ended up creating improper data sets when doing model training and evaluation. Something should be adjusted in .randomSplit so that under all circumstances, the returned Dataframes will have unique rows. 
>  
> Here is a Pyspark script I wrote that re-creates the issue and includes the work-around line that fixes it as a temporary workaround: 
>  
> {code:java}
> import numpy as np
> from pyspark.sql import Row
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> N = 100000
> ratio1 = 0.85
> ratio2 = 0.15
> gen_rand = udf(lambda x: int(np.random.random()*50000 + 2), IntegerType())
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> orig_list = list(np.zeros(N))
> rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
> df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
> dfA = df.withColumn("ID2", gen_rand(df['ID']))
> dfA = dfA.select("ID2").distinct()
> dfA_els = dfA.rdd.map(lambda x: x['ID2']).collect()
> print("This confirms that if you look at the parent Dataframe, the ID2 col has unqiue values")
> print("Num rows parent DF: {}".format(len(dfA_els)))
> print("num unique ID2 vals: {}".format(len(set(dfA_els))))
> #dfA = dfA.cache() #Uncommenting this line does fix the issue
> df1, df2 = dfA.randomSplit([ratio2, ratio1])
> df1_ids = set(df1.rdd.map(lambda x: x['ID2']).distinct().collect())
> df2_ids = set(df2.rdd.map(lambda x: x['ID2']).distinct().collect())
> num_inter = len(df1_ids.intersection(df2_ids))
> print()
> print("Number common IDs between the two splits: {}".format(num_inter))
> print("(should be zero if randomSplit is working as expected)")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org