You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ben Leslie <be...@benno.id.au> on 2016/09/09 12:01:32 UTC

pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK

Hi,

I'm trying to understand if there is any difference in correctness
between rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) and
rdd.persist(pyspark.StorageLevel.MEMORY_AND_DISK).

I can see that there may be differences in performance, but my
expectation was that using either would result in the same behaviour.
However that is not what I'm seeing in practise.

Specifically I have some code like:

    text_lines = sc.textFile(input_files)
    records = text_lines.map(json.loads)
    records.persist(pyspark.StorageLevel.MEMORY_ONLY)
    count = records.count()
    records.unpersist()

When I do not use persist at all the 'count' variable contains the
correct value.
When I use persist with pyspark.StorageLevel.MEMORY_AND_DISK, I also
get the correct, expected value.
However, if I use persist with no argument (or
pyspark.StorageLevel.MEMORY_ONLY) then the value of 'count' is too
small.

In all cases the script completes without errors (or warning as far as
I can tell).

I'm using Spark 2.0.0 on an AWS EMR cluster.

It appears that the executors may not have enough memory to store all
the RDD partitions in memory only, however I thought in this case it
would fall back to regenerating from the parent RDD, rather than
providing the wrong answer.

Is this the expected behaviour? It seems a little difficult to work
with in practise.

Cheers,

Ben

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK

Posted by Josh Rosen <jo...@databricks.com>.
Based on Ben's helpful error description, I managed to reproduce this bug
and found the root cause:

There's a bug in MemoryStore's PartiallySerializedBlock class: it doesn't
close a serialization stream before attempting to deserialize its
serialized values, causing it to miss any data stored in the serializer's
internal buffers (which can happen with KryoSerializer, which was
automatically being used to serialize RDDs of byte arrays). I've reported
this as https://issues.apache.org/jira/browse/SPARK-17491 and have submitted
 https://github.com/apache/spark/pull/15043 to fix this (I'm still planning
to add more tests to that patch).

On Fri, Sep 9, 2016 at 10:37 AM Josh Rosen <jo...@databricks.com> wrote:

> cache() / persist() is definitely *not* supposed to affect the result of
> a program, so the behavior that you're seeing is unexpected.
>
> I'll try to reproduce this myself by caching in PySpark under heavy memory
> pressure, but in the meantime the following questions will help me to debug:
>
>    - Does this only happen in Spark 2.0? Have you successfully run the
>    same workload with correct behavior on an earlier Spark version, such as
>    1.6.x?
>    - How accurately does your example code model the structure of your
>    real code? Are you calling cache()/persist() on an RDD which has been
>    transformed in Python or are you calling it on an untransformed input RDD
>    (such as the RDD returned from sc.textFile() / sc.hadoopFile())?
>
>
> On Fri, Sep 9, 2016 at 5:01 AM Ben Leslie <be...@benno.id.au> wrote:
>
>> Hi,
>>
>> I'm trying to understand if there is any difference in correctness
>> between rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) and
>> rdd.persist(pyspark.StorageLevel.MEMORY_AND_DISK).
>>
>> I can see that there may be differences in performance, but my
>> expectation was that using either would result in the same behaviour.
>> However that is not what I'm seeing in practise.
>>
>> Specifically I have some code like:
>>
>>     text_lines = sc.textFile(input_files)
>>     records = text_lines.map(json.loads)
>>     records.persist(pyspark.StorageLevel.MEMORY_ONLY)
>>     count = records.count()
>>     records.unpersist()
>>
>> When I do not use persist at all the 'count' variable contains the
>> correct value.
>> When I use persist with pyspark.StorageLevel.MEMORY_AND_DISK, I also
>> get the correct, expected value.
>> However, if I use persist with no argument (or
>> pyspark.StorageLevel.MEMORY_ONLY) then the value of 'count' is too
>> small.
>>
>> In all cases the script completes without errors (or warning as far as
>> I can tell).
>>
>> I'm using Spark 2.0.0 on an AWS EMR cluster.
>>
>> It appears that the executors may not have enough memory to store all
>> the RDD partitions in memory only, however I thought in this case it
>> would fall back to regenerating from the parent RDD, rather than
>> providing the wrong answer.
>>
>> Is this the expected behaviour? It seems a little difficult to work
>> with in practise.
>>
>> Cheers,
>>
>> Ben
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>

Re: pyspark persist MEMORY_ONLY vs MEMORY_AND_DISK

Posted by Josh Rosen <jo...@databricks.com>.
cache() / persist() is definitely *not* supposed to affect the result of a
program, so the behavior that you're seeing is unexpected.

I'll try to reproduce this myself by caching in PySpark under heavy memory
pressure, but in the meantime the following questions will help me to debug:

   - Does this only happen in Spark 2.0? Have you successfully run the same
   workload with correct behavior on an earlier Spark version, such as 1.6.x?
   - How accurately does your example code model the structure of your real
   code? Are you calling cache()/persist() on an RDD which has been
   transformed in Python or are you calling it on an untransformed input RDD
   (such as the RDD returned from sc.textFile() / sc.hadoopFile())?


On Fri, Sep 9, 2016 at 5:01 AM Ben Leslie <be...@benno.id.au> wrote:

> Hi,
>
> I'm trying to understand if there is any difference in correctness
> between rdd.persist(pyspark.StorageLevel.MEMORY_ONLY) and
> rdd.persist(pyspark.StorageLevel.MEMORY_AND_DISK).
>
> I can see that there may be differences in performance, but my
> expectation was that using either would result in the same behaviour.
> However that is not what I'm seeing in practise.
>
> Specifically I have some code like:
>
>     text_lines = sc.textFile(input_files)
>     records = text_lines.map(json.loads)
>     records.persist(pyspark.StorageLevel.MEMORY_ONLY)
>     count = records.count()
>     records.unpersist()
>
> When I do not use persist at all the 'count' variable contains the
> correct value.
> When I use persist with pyspark.StorageLevel.MEMORY_AND_DISK, I also
> get the correct, expected value.
> However, if I use persist with no argument (or
> pyspark.StorageLevel.MEMORY_ONLY) then the value of 'count' is too
> small.
>
> In all cases the script completes without errors (or warning as far as
> I can tell).
>
> I'm using Spark 2.0.0 on an AWS EMR cluster.
>
> It appears that the executors may not have enough memory to store all
> the RDD partitions in memory only, however I thought in this case it
> would fall back to regenerating from the parent RDD, rather than
> providing the wrong answer.
>
> Is this the expected behaviour? It seems a little difficult to work
> with in practise.
>
> Cheers,
>
> Ben
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>