You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aaron Olson <aa...@shopify.com> on 2014/03/11 18:11:18 UTC

Pyspark Memory Woes

Dear Sparkians,

We are working on a system to do relational modeling on top of Spark, all
done in pyspark. While we've been learning a lot about Spark internals so
far, we're currently running into memory issues and wondering how best to
profile to fix them. Here are our symptoms:

   - We're operating on data sets up to 80G in size of uncompressed JSON,
   66 million records in the largest one.
   - Sometimes we're joining those large data sets, but cardinality never
   exceeds 66 million (unless we've got a bug somewhere).
   - We're seeing various OOM problems: sometimes python takes all
   available mem, sometimes we OOM with no heap space left, and occasionally
   OOM with GC overhead limit exceeded.
   - Sometimes we also see what looks like a single huge message sent over
   the wire that exceeds the wire format limitations.
   - Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It
   seems like we should have more than enough to do this comfortably.
   - We're trying to isolate specific steps now, but every time it errors,
   we're partitioning (i.e. partitionByKey is involved somewhere).

We've been instrumneting according to the monitoring and tuning docs, but a
bit at a loss for where we're going wrong. We suspect poor/wrong
partitioning on our part somehow. With that in mind, some questions:

   - How exactly is partitioning information propagated? It looks like
   within a pipelined RDD the parent partitioning is preserved throughout
   unless we either specifically repartition or go through a reduce. We're
   splitting as much as we can on maps and letting reduces happen normally. Is
   that good practice?
   - When doing e.g. partitionByKey, does an entire partition get sent to
   one worker process?
   - When does Spark stream data? Are there easy ways to sabotage the
   streaming? Are there any knobs for us to twiddle here?
   - Is there any way to specify the number of shuffles for a given reduce
   step?
   - How can we get better insight into what our workers are doing,
   specifically around moving data in and out of python land?

I realise it's hard to troubleshoot in the absence of code but any test
case we have would be contrived. We're collecting more metrics and trying
to reason about what might be happening, but any guidance at this point
would be most helpful.

Thanks!

-- 
Aaron Olson
Data Engineer, Shopify

Re: Pyspark Memory Woes

Posted by Aaron Olson <aa...@shopify.com>.
Hi Sandy,

We are, yes. I strongly suspect we're not partitioning our data properly,
but maybe 1.5G is simply too small for our workload. I'll bump the executor
memory and see if we get better results.

It seems we should be setting it to (SPARK_WORKER_MEMORY + pyspark memory)
/ # of concurrent applications, but is there any advice on how to balance
memory between executors and pyspark, or does it depend too much on the
workload? How do we know if we're getting the most bang for our buck, so to
speak?

Thanks again,

-Aaron


On Tue, Mar 11, 2014 at 6:06 PM, Sandy Ryza <sa...@cloudera.com> wrote:

> Are you aware that you get an executor (and the 1.5GB) per machine, not
> per core?
>
>
>
> On Tue, Mar 11, 2014 at 12:52 PM, Aaron Olson <aa...@shopify.com>wrote:
>
>> Hi Sandy,
>>
>> We're configuring that with the JAVA_OPTS environment variable in
>> $SPARK_HOME/spark-worker-env.sh like this:
>>
>> # JAVA OPTS
>> export SPARK_JAVA_OPTS="-Dspark.ui.port=0
>> -Dspark.default.parallelism=1024 -Dspark.cores.max=256
>> -Dspark.executor.memory=1500m -Dspark.worker.timeout=500
>> -Dspark.akka.timeout=500 "
>>
>> Does that value seem low to you?
>>
>> -Aaron
>>
>>
>>
>>
>> On Tue, Mar 11, 2014 at 3:08 PM, Sandy Ryza <sa...@cloudera.com>wrote:
>>
>>> Hi Aaron,
>>>
>>> When you say "Java heap space is 1.5G per worker, 24 or 32 cores across
>>> 46 nodes. It seems like we should have more than enough to do this
>>> comfortably.", how are you configuring this?
>>>
>>> -Sandy
>>>
>>>
>>> On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson <aa...@shopify.com>wrote:
>>>
>>>> Dear Sparkians,
>>>>
>>>> We are working on a system to do relational modeling on top of Spark,
>>>> all done in pyspark. While we've been learning a lot about Spark internals
>>>> so far, we're currently running into memory issues and wondering how best
>>>> to profile to fix them. Here are our symptoms:
>>>>
>>>>    - We're operating on data sets up to 80G in size of uncompressed
>>>>    JSON, 66 million records in the largest one.
>>>>    - Sometimes we're joining those large data sets, but cardinality
>>>>    never exceeds 66 million (unless we've got a bug somewhere).
>>>>    - We're seeing various OOM problems: sometimes python takes all
>>>>    available mem, sometimes we OOM with no heap space left, and occasionally
>>>>    OOM with GC overhead limit exceeded.
>>>>    - Sometimes we also see what looks like a single huge message sent
>>>>    over the wire that exceeds the wire format limitations.
>>>>    - Java heap space is 1.5G per worker, 24 or 32 cores across 46
>>>>    nodes. It seems like we should have more than enough to do this comfortably.
>>>>    - We're trying to isolate specific steps now, but every time it
>>>>    errors, we're partitioning (i.e. partitionByKey is involved somewhere).
>>>>
>>>> We've been instrumneting according to the monitoring and tuning docs,
>>>> but a bit at a loss for where we're going wrong. We suspect poor/wrong
>>>> partitioning on our part somehow. With that in mind, some questions:
>>>>
>>>>    - How exactly is partitioning information propagated? It looks like
>>>>    within a pipelined RDD the parent partitioning is preserved throughout
>>>>    unless we either specifically repartition or go through a reduce. We're
>>>>    splitting as much as we can on maps and letting reduces happen normally. Is
>>>>    that good practice?
>>>>    - When doing e.g. partitionByKey, does an entire partition get sent
>>>>    to one worker process?
>>>>    - When does Spark stream data? Are there easy ways to sabotage the
>>>>    streaming? Are there any knobs for us to twiddle here?
>>>>    - Is there any way to specify the number of shuffles for a given
>>>>    reduce step?
>>>>    - How can we get better insight into what our workers are doing,
>>>>    specifically around moving data in and out of python land?
>>>>
>>>> I realise it's hard to troubleshoot in the absence of code but any test
>>>> case we have would be contrived. We're collecting more metrics and trying
>>>> to reason about what might be happening, but any guidance at this point
>>>> would be most helpful.
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>> Aaron Olson
>>>> Data Engineer, Shopify
>>>>
>>>
>>>
>>
>>
>> --
>> Aaron Olson
>> Data Engineer, Shopify
>>
>
>


-- 
Aaron Olson
Data Engineer, Shopify

Re: Pyspark Memory Woes

Posted by Sandy Ryza <sa...@cloudera.com>.
Are you aware that you get an executor (and the 1.5GB) per machine, not per
core?



On Tue, Mar 11, 2014 at 12:52 PM, Aaron Olson <aa...@shopify.com>wrote:

> Hi Sandy,
>
> We're configuring that with the JAVA_OPTS environment variable in
> $SPARK_HOME/spark-worker-env.sh like this:
>
> # JAVA OPTS
> export SPARK_JAVA_OPTS="-Dspark.ui.port=0 -Dspark.default.parallelism=1024
> -Dspark.cores.max=256 -Dspark.executor.memory=1500m
> -Dspark.worker.timeout=500 -Dspark.akka.timeout=500 "
>
> Does that value seem low to you?
>
> -Aaron
>
>
>
>
> On Tue, Mar 11, 2014 at 3:08 PM, Sandy Ryza <sa...@cloudera.com>wrote:
>
>> Hi Aaron,
>>
>> When you say "Java heap space is 1.5G per worker, 24 or 32 cores across
>> 46 nodes. It seems like we should have more than enough to do this
>> comfortably.", how are you configuring this?
>>
>> -Sandy
>>
>>
>> On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson <aa...@shopify.com>wrote:
>>
>>> Dear Sparkians,
>>>
>>> We are working on a system to do relational modeling on top of Spark,
>>> all done in pyspark. While we've been learning a lot about Spark internals
>>> so far, we're currently running into memory issues and wondering how best
>>> to profile to fix them. Here are our symptoms:
>>>
>>>    - We're operating on data sets up to 80G in size of uncompressed
>>>    JSON, 66 million records in the largest one.
>>>    - Sometimes we're joining those large data sets, but cardinality
>>>    never exceeds 66 million (unless we've got a bug somewhere).
>>>    - We're seeing various OOM problems: sometimes python takes all
>>>    available mem, sometimes we OOM with no heap space left, and occasionally
>>>    OOM with GC overhead limit exceeded.
>>>    - Sometimes we also see what looks like a single huge message sent
>>>    over the wire that exceeds the wire format limitations.
>>>    - Java heap space is 1.5G per worker, 24 or 32 cores across 46
>>>    nodes. It seems like we should have more than enough to do this comfortably.
>>>    - We're trying to isolate specific steps now, but every time it
>>>    errors, we're partitioning (i.e. partitionByKey is involved somewhere).
>>>
>>> We've been instrumneting according to the monitoring and tuning docs,
>>> but a bit at a loss for where we're going wrong. We suspect poor/wrong
>>> partitioning on our part somehow. With that in mind, some questions:
>>>
>>>    - How exactly is partitioning information propagated? It looks like
>>>    within a pipelined RDD the parent partitioning is preserved throughout
>>>    unless we either specifically repartition or go through a reduce. We're
>>>    splitting as much as we can on maps and letting reduces happen normally. Is
>>>    that good practice?
>>>    - When doing e.g. partitionByKey, does an entire partition get sent
>>>    to one worker process?
>>>    - When does Spark stream data? Are there easy ways to sabotage the
>>>    streaming? Are there any knobs for us to twiddle here?
>>>    - Is there any way to specify the number of shuffles for a given
>>>    reduce step?
>>>    - How can we get better insight into what our workers are doing,
>>>    specifically around moving data in and out of python land?
>>>
>>> I realise it's hard to troubleshoot in the absence of code but any test
>>> case we have would be contrived. We're collecting more metrics and trying
>>> to reason about what might be happening, but any guidance at this point
>>> would be most helpful.
>>>
>>> Thanks!
>>>
>>> --
>>> Aaron Olson
>>> Data Engineer, Shopify
>>>
>>
>>
>
>
> --
> Aaron Olson
> Data Engineer, Shopify
>

Re: Pyspark Memory Woes

Posted by Aaron Olson <aa...@shopify.com>.
Hi Sandy,

We're configuring that with the JAVA_OPTS environment variable in
$SPARK_HOME/spark-worker-env.sh like this:

# JAVA OPTS
export SPARK_JAVA_OPTS="-Dspark.ui.port=0 -Dspark.default.parallelism=1024
-Dspark.cores.max=256 -Dspark.executor.memory=1500m
-Dspark.worker.timeout=500 -Dspark.akka.timeout=500 "

Does that value seem low to you?

-Aaron




On Tue, Mar 11, 2014 at 3:08 PM, Sandy Ryza <sa...@cloudera.com> wrote:

> Hi Aaron,
>
> When you say "Java heap space is 1.5G per worker, 24 or 32 cores across
> 46 nodes. It seems like we should have more than enough to do this
> comfortably.", how are you configuring this?
>
> -Sandy
>
>
> On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson <aa...@shopify.com>wrote:
>
>> Dear Sparkians,
>>
>> We are working on a system to do relational modeling on top of Spark, all
>> done in pyspark. While we've been learning a lot about Spark internals so
>> far, we're currently running into memory issues and wondering how best to
>> profile to fix them. Here are our symptoms:
>>
>>    - We're operating on data sets up to 80G in size of uncompressed
>>    JSON, 66 million records in the largest one.
>>    - Sometimes we're joining those large data sets, but cardinality
>>    never exceeds 66 million (unless we've got a bug somewhere).
>>    - We're seeing various OOM problems: sometimes python takes all
>>    available mem, sometimes we OOM with no heap space left, and occasionally
>>    OOM with GC overhead limit exceeded.
>>    - Sometimes we also see what looks like a single huge message sent
>>    over the wire that exceeds the wire format limitations.
>>    - Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes.
>>    It seems like we should have more than enough to do this comfortably.
>>    - We're trying to isolate specific steps now, but every time it
>>    errors, we're partitioning (i.e. partitionByKey is involved somewhere).
>>
>> We've been instrumneting according to the monitoring and tuning docs, but
>> a bit at a loss for where we're going wrong. We suspect poor/wrong
>> partitioning on our part somehow. With that in mind, some questions:
>>
>>    - How exactly is partitioning information propagated? It looks like
>>    within a pipelined RDD the parent partitioning is preserved throughout
>>    unless we either specifically repartition or go through a reduce. We're
>>    splitting as much as we can on maps and letting reduces happen normally. Is
>>    that good practice?
>>    - When doing e.g. partitionByKey, does an entire partition get sent
>>    to one worker process?
>>    - When does Spark stream data? Are there easy ways to sabotage the
>>    streaming? Are there any knobs for us to twiddle here?
>>    - Is there any way to specify the number of shuffles for a given
>>    reduce step?
>>    - How can we get better insight into what our workers are doing,
>>    specifically around moving data in and out of python land?
>>
>> I realise it's hard to troubleshoot in the absence of code but any test
>> case we have would be contrived. We're collecting more metrics and trying
>> to reason about what might be happening, but any guidance at this point
>> would be most helpful.
>>
>> Thanks!
>>
>> --
>> Aaron Olson
>> Data Engineer, Shopify
>>
>
>


-- 
Aaron Olson
Data Engineer, Shopify

Re: Pyspark Memory Woes

Posted by Sandy Ryza <sa...@cloudera.com>.
Hi Aaron,

When you say "Java heap space is 1.5G per worker, 24 or 32 cores across 46
nodes. It seems like we should have more than enough to do this
comfortably.", how are you configuring this?

-Sandy


On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson <aa...@shopify.com>wrote:

> Dear Sparkians,
>
> We are working on a system to do relational modeling on top of Spark, all
> done in pyspark. While we've been learning a lot about Spark internals so
> far, we're currently running into memory issues and wondering how best to
> profile to fix them. Here are our symptoms:
>
>    - We're operating on data sets up to 80G in size of uncompressed JSON,
>    66 million records in the largest one.
>    - Sometimes we're joining those large data sets, but cardinality never
>    exceeds 66 million (unless we've got a bug somewhere).
>    - We're seeing various OOM problems: sometimes python takes all
>    available mem, sometimes we OOM with no heap space left, and occasionally
>    OOM with GC overhead limit exceeded.
>    - Sometimes we also see what looks like a single huge message sent
>    over the wire that exceeds the wire format limitations.
>    - Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes.
>    It seems like we should have more than enough to do this comfortably.
>    - We're trying to isolate specific steps now, but every time it
>    errors, we're partitioning (i.e. partitionByKey is involved somewhere).
>
> We've been instrumneting according to the monitoring and tuning docs, but
> a bit at a loss for where we're going wrong. We suspect poor/wrong
> partitioning on our part somehow. With that in mind, some questions:
>
>    - How exactly is partitioning information propagated? It looks like
>    within a pipelined RDD the parent partitioning is preserved throughout
>    unless we either specifically repartition or go through a reduce. We're
>    splitting as much as we can on maps and letting reduces happen normally. Is
>    that good practice?
>    - When doing e.g. partitionByKey, does an entire partition get sent to
>    one worker process?
>    - When does Spark stream data? Are there easy ways to sabotage the
>    streaming? Are there any knobs for us to twiddle here?
>    - Is there any way to specify the number of shuffles for a given
>    reduce step?
>    - How can we get better insight into what our workers are doing,
>    specifically around moving data in and out of python land?
>
> I realise it's hard to troubleshoot in the absence of code but any test
> case we have would be contrived. We're collecting more metrics and trying
> to reason about what might be happening, but any guidance at this point
> would be most helpful.
>
> Thanks!
>
> --
> Aaron Olson
> Data Engineer, Shopify
>