You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zachary (JIRA)" <ji...@apache.org> on 2019/06/20 17:34:00 UTC

[jira] [Created] (SPARK-28125) dataframes created by randomSplit have overlapping rows

Zachary created SPARK-28125:
-------------------------------

             Summary: dataframes created by randomSplit have overlapping rows
                 Key: SPARK-28125
                 URL: https://issues.apache.org/jira/browse/SPARK-28125
             Project: Spark
          Issue Type: Bug
          Components: ML, MLlib, PySpark, Spark Core
    Affects Versions: 2.4.3
         Environment: Run with Databricks Runtime 5.3 ML (includes Apache Spark 2.4.0, Scala 2.11)

 

More details on the environment: [https://docs.databricks.com/release-notes/runtime/5.3ml.html]

The python package versions: [https://docs.databricks.com/release-notes/runtime/5.3ml.html#python-libraries]
            Reporter: Zachary


It appears that the function randomSplit on a DataFrame creates a separate execution plan for each of the result DataFrames, or at least that's the impression I get from reading a few StackOverflow pages on it: 

[https://stackoverflow.com/questions/38379522/how-does-spark-keep-track-of-the-splits-in-randomsplit/38380023#38380023]

[https://stackoverflow.com/questions/32933143/how-does-sparks-rdd-randomsplit-actually-split-the-rdd/32933366]

 

Because of the separate executions, it is easy to create a situation where the Dataframes returned by randomSplit have overlapping rows. Thus if people are relying on it to split a dataset into training and test, then they could easily end up with the same rows in both sets, thus causing a serious problem when running model evaluation. 

 

I know that if you call .cache() on the RDD before calling .randomSplit then you can be assured that the returned frames have unique rows, but this work-around is definitely not obvious. I did not know about this issue and ended up creating improper data sets when doing model training and evaluation. Something should be adjusted in .randomSplit so that under all circumstances, the returned Dataframes will have unique rows. 

 

Here is a Pyspark script I wrote that re-creates the issue and includes the work-around line that fixes it as a temporary workaround: 

 
{code:java}
import numpy as np
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *



N = 100000
ratio1 = 0.85
ratio2 = 0.15
gen_rand = udf(lambda x: int(np.random.random()*50000 + 2), IntegerType())



orig_list = list(np.zeros(N))
rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
dfA = df.withColumn("ID2", gen_rand(df['ID']))



orig_list = list(np.zeros(N))
rdd = sc.parallelize(orig_list).map(int).map(lambda x: {'ID': x})
df = sqlContext.createDataFrame(rdd.map(lambda x: Row(**x)))
dfA = df.withColumn("ID2", gen_rand(df['ID']))

dfA = dfA.select("ID2").distinct()

dfA_els = dfA.rdd.map(lambda x: x['ID2']).collect()

print("This confirms that if you look at the parent Dataframe, the ID2 col has unqiue values")
print("Num rows parent DF: {}".format(len(dfA_els)))
print("num unique ID2 vals: {}".format(len(set(dfA_els))))

#dfA = dfA.cache() #Uncommenting this line does fix the issue

df1, df2 = dfA.randomSplit([ratio2, ratio1])

df1_ids = set(df1.rdd.map(lambda x: x['ID2']).distinct().collect())
df2_ids = set(df2.rdd.map(lambda x: x['ID2']).distinct().collect())

num_inter = len(df1_ids.intersection(df2_ids))
print()
print("Number common IDs between the two splits: {}".format(num_inter))
print("(should be zero if randomSplit is working as expected)")

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org