You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Clément Stenac (JIRA)" <ji...@apache.org> on 2017/11/01 17:45:00 UTC

[jira] [Created] (SPARK-22410) Excessive spill for Pyspark UDF when a row has shrunk

Clément Stenac created SPARK-22410:
--------------------------------------

             Summary: Excessive spill for Pyspark UDF when a row has shrunk
                 Key: SPARK-22410
                 URL: https://issues.apache.org/jira/browse/SPARK-22410
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.3.0
         Environment: Reproduced on up-to-date master
            Reporter: Clément Stenac
            Priority: Minor


Hi,

The following code processes 900KB of data and outputs around 2MB of data. However, to process it, Spark needs to spill roughly 12 GB of data.

{code:python}
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

ss = SparkSession.builder.getOrCreate()

# Create a few lines of data (5 lines).
# Each line is made of a string, and an array of 10000 strings
# Total size of data is around 900 KB

lines_of_file = [ "this is a line" for x in xrange(10000) ]
file_obj = [ "this_is_a_foldername/this_is_a_filename", lines_of_file ]
data = [ file_obj for x in xrange(5) ]

# Make a two-columns dataframe out of it
small_df = ss.sparkContext.parallelize(data).map(lambda x : (x[0], x[1])).toDF(["file", "lines"])

# We then explode the array, so we now have 50000 rows in the dataframe, with 2 columns, the 2nd 
# column now has only "this is a line" as content
exploded = small_df.select("file", explode("lines"))

print("Exploded")
print(exploded.explain())

# Now, just process it with a trivial Pyspark UDF that touches the first column
# (the one which was not an array)

def split_key(s):
    return s.split("/")[1]
split_key_udf = udf(split_key, StringType())

with_filename = exploded.withColumn("filename", split_key_udf("file"))

# As expected, explain plan is very simple (BatchEval -> Explode -> Project -> ScanExisting)
print(with_filename.explain())

# Getting the head will spill around 12 GB of data
print(with_filename.head())
{code}

The spill happens in the HybridRowQueue that is used to merge the part that went through the Python worker and the part that didn't.

The problem comes from the fact that when it is added to the HybridRowQueue, the UnsafeRow has a totalSizeInBytes of ~240000 (seen by adding debug message in HybridRowQueue), whereas, since it's after the explode, the actual size of the row should be in the ~60 bytes range.

My understanding is that the row has retained the size it consumed *prior* to the explode (at that time, the size of each of the 5 rows was indeed ~240000 bytes.

A workaround is to do exploded.cache() before calling the UDF. The fact of going through the InMemoryColumnarTableScan "resets" the wrongful size of the UnsafeRow.

Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org