You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dennis Proppe (JIRA)" <ji...@apache.org> on 2015/05/06 13:48:00 UTC

[jira] [Comment Edited] (SPARK-7116) Intermediate RDD cached but never unpersisted

    [ https://issues.apache.org/jira/browse/SPARK-7116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14530390#comment-14530390 ] 

Dennis Proppe edited comment on SPARK-7116 at 5/6/15 11:47 AM:
---------------------------------------------------------------

*I second the importance of fixing this.*

For our production workloads, this is really a large issue and blocks the use of large data:

1) It creates non-removeable RDDs cached in memory
2) They can't even be removed by deleting the RDD in question in the code
3) The RDDs are up to 10 times bigger that they would be by calling df.cache()

The behaviour can be reproduced easily in the PySpark Shell:

-------
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

slen = udf(lambda s: len(s), IntegerType())
rows = [["a", "hello"], ["b", "goodbye"],["c", "whatever"]] * 1000
rd = sc.parallelize(rows)
head = ['order','word']

df = rd.toDF(head)
df.cache().count()

bigfile = df.withColumn("word_length", slen(df.word))
bigfile.count()
---------------------------

If you now compare the size of the cached df versus the automagically appeared cached bigfile, you see that bigfile uses about 8X the storage of df, although it only has 1 column more.


was (Author: dproppe):
*I second the importance of fixing this.*

For our production workloads, this is really a large issue and blocks the use of large data:

1) It creates non-removeable RDDs cached in memory
2) They can't even be removed by deleting the RDD in question in the code
3) The RDDs are up to 10 times bigger that they would be by calling df.cache()

The behaviour can be reproduced easily in the PySpark Shell:


     from pyspark import SQLContext
     from pyspark.sql.types import IntegerType
     from pyspark.sql.functions import udf



sqlcon = SQLContext(sc)
slen = udf(lambda s: len(s), IntegerType())

rows = [["a", "hello"], ["b", "goodbye"],["c", "whatever"]] * 1000
rd = sc.parallelize(rows)
head = ['order','word']
df = rd.toDF(head)
df.cache().count()

`
bigfile = df.withColumn("word_length", slen(df.word))
bigfile.count()
`

If you now compare the size of the cached df versus the automagically appeared cached bigfile, you see that bigfile uses about 8X the storage of df, although it only has 1 column more.

> Intermediate RDD cached but never unpersisted
> ---------------------------------------------
>
>                 Key: SPARK-7116
>                 URL: https://issues.apache.org/jira/browse/SPARK-7116
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, SQL
>    Affects Versions: 1.3.1
>            Reporter: Kalle Jepsen
>
> In https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala#L233 an intermediate RDD is cached, but never unpersisted. It shows up in the 'Storage' section of the Web UI, but cannot be removed. There's already a comment in the source, suggesting to 'clean up'. If that cleanup is more involved than simply calling `unpersist`, it probably exceeds my current Scala skills.
> Why that is a problem:
> I'm adding a constant column to a DataFrame of about 20M records resulting from an inner join with {{df.withColumn(colname, ud_func())}} , where {{ud_func}} is simply a wrapped {{lambda: 1}}. Before and after applying the UDF the DataFrame takes up ~430MB in the cache. The cached intermediate RDD however takes up ~10GB(!) of storage, and I know of no way to uncache it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org