You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/08/18 20:16:18 UTC
[jira] [Created] (SPARK-3105) Calling cache() after RDDs are
pipelined has no effect in PySpark
Josh Rosen created SPARK-3105:
---------------------------------
Summary: Calling cache() after RDDs are pipelined has no effect in PySpark
Key: SPARK-3105
URL: https://issues.apache.org/jira/browse/SPARK-3105
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 1.0.0, 1.1.0
Reporter: Josh Rosen
PySpark's PipelinedRDD decides whether to pipeline transformations by checking whether those transformations are pipelinable _at the time that the PipelinedRDD objects are created_ rather than at the time that we invoke actions. This might lead to problems if we call {{cache()}} on an RDD after it's already been used in a pipeline:
{code}
rdd1 = sc.parallelize(range(100)).map(lambda x: x)
rdd2 = rdd1.map(lambda x: 2 * x)
rdd1.cache()
rdd2.collect()
{code}
When I run this code, I'd expect {cache()}} to break the pipeline and cache intermediate results, but instead the two transformations are pipelined together in Python, effectively ignoring the {{cache()}}.
Note that {{cache()}} works properly if we call it before performing any other transformations on the RDD:
{code}
rdd1 = sc.parallelize(range(100)).map(lambda x: x).cache()
rdd2 = rdd1.map(lambda x: 2 * x)
rdd2.collect()
{code}
This works as expected and caches {{rdd1}}.
To fix this, I think we dynamically decide whether to pipeline when we actually perform actions, rather than statically deciding when we create the RDDs.
We should also add tests for this.
(Thanks to [~tdas] for pointing out this issue.)
--
This message was sent by Atlassian JIRA
(v6.2#6252)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org