You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by eshishki <it...@gmail.com> on 2013/10/15 16:29:56 UTC

pyspark memory usage

Hello,

i setuped spark-0.8.0-incubating-bin-cdh4 on 5 node cluster.

I limited SPARK_WORKER_MEMORY to 2g and there are 4 cores per node, so i
expected total memory consumption by spark to be 512mb + 2gb.
Spark webui shows *Memory:* 10.0 GB Total, 0.0 B Used

Then i tried to run simple wordcount.py from examples on a hdfs file, which
size is 11GB.
Spark launched 4 workers per node, and did not limited its total size by
2gb - top showed RES consumption about 750mb and then
Out of memory: Kill process 26336 (python) score 97 or sacrifice child
Killed process 26336, UID 500, (python) total-vm:969696kB,
anon-rss:782976kB, file-rss:196kB

and in the logs

INFO cluster.ClusterTaskSetManager: Loss was due to
org.apache.spark.SparkException
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:167)
        at
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:173)
        at
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:116)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:193)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
        at
org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

So i could not finished the task. Yes, spark resubmited the task, but it
was continuing OOM Killed.

Against a smaller file spark was doing good.

So the question is - why spark does not limit its memory accordinaly and
how to analyze files larger than ram with it?

Thanks.

Re: pyspark memory usage

Posted by Shangyu Luo <ls...@gmail.com>.
I think I met the similar problem before. In my opinion, we cannot set up
the memory for each python process on worker nodes and the memory
consumption is determined by spark. I have tried setting up
SPARK_DAEMON_MEMORY and SPARK_DAEMON_JAVA_OPTS because the name of python
process on worker nodes is daemon.py, and these settings cannot control the
memory of python processes, either.
I agree with what Matei has said and I think you can also increase the
memory for jvm. In addition, as you have four workers per node, I
think SPARK_WORKER_MEMORY
is for each worker, so the total memory consumption should be 512mb + 8gb



2013/10/18 eshishki <it...@gmail.com>

> >I'm not sure I understand your problem -- is it that Spark used *less*
> memory than the 2 GB?
> jvm used memory as expected - 512mb
> but all the python workers was not bounded by 2GB limit - they grew in RES
> size until OOM Killer came itno play
>
> Yes, i can change parallelism level for map and reduce, but think about it
> - i can not have one script for every file. I must know in advance its
> size, so i can estimate memory comsumption by worker and adjust parallelism
> level accordinaly.
> What worker memory limit is for?
>
>
> On Thu, Oct 17, 2013 at 10:15 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> Hi there,
>>
>> I'm not sure I understand your problem -- is it that Spark used *less*
>> memory than the 2 GB? That out of memory message seems to be from your
>> operating system, so maybe there were other things using RAM on that
>> machine, or maybe Linux is configured to kill tasks quickly when the memory
>> gets full.
>>
>> When you're running PySpark, the underlying Spark process is unlikely to
>> use a ton of memory unless you cache stuff, because it just pipes data to
>> Python. However, it does launch one Python process per core, and those may
>> be using a fair amount of RAM. If you'd like to decrease the memory usage
>> per process, try changing the reduceByKey(add) in wordcount.py to use more
>> reduce tasks by passing a second parameter to it (for example,
>> reduceByKey(add, 20) will have it use 20 parallel tasks). Likewise you can
>> set a "minimum number of tasks" value on the textFile call; it's 1 by
>> default but you can increase it to, say, 100, to make sure that there are
>> at least 100 map tasks. This will make the load per task smaller.
>>
>> Matei
>>
>> On Oct 15, 2013, at 7:29 AM, eshishki <it...@gmail.com> wrote:
>>
>>
>> Hello,
>>
>> i setuped spark-0.8.0-incubating-bin-cdh4 on 5 node cluster.
>>
>> I limited SPARK_WORKER_MEMORY to 2g and there are 4 cores per node, so i
>> expected total memory consumption by spark to be 512mb + 2gb.
>> Spark webui shows *Memory:* 10.0 GB Total, 0.0 B Used
>>
>> Then i tried to run simple wordcount.py from examples on a hdfs file,
>> which size is 11GB.
>> Spark launched 4 workers per node, and did not limited its total size by
>> 2gb - top showed RES consumption about 750mb and then
>> Out of memory: Kill process 26336 (python) score 97 or sacrifice child
>> Killed process 26336, UID 500, (python) total-vm:969696kB,
>> anon-rss:782976kB, file-rss:196kB
>>
>> and in the logs
>>
>> INFO cluster.ClusterTaskSetManager: Loss was due to
>> org.apache.spark.SparkException
>> org.apache.spark.SparkException: Python worker exited unexpectedly
>> (crashed)
>>         at
>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:167)
>>         at
>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:173)
>>         at
>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:116)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>         at
>> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:193)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>>
>> So i could not finished the task. Yes, spark resubmited the task, but it
>> was continuing OOM Killed.
>>
>> Against a smaller file spark was doing good.
>>
>> So the question is - why spark does not limit its memory accordinaly and
>> how to analyze files larger than ram with it?
>>
>> Thanks.
>>
>>
>>
>
>
> --
> Евгений
>



-- 
--

Shangyu, Luo
Department of Computer Science
Rice University

--
Not Just Think About It, But Do It!
--
Success is never final.
--
Losers always whine about their best

Re: pyspark memory usage

Posted by eshishki <it...@gmail.com>.
>I'm not sure I understand your problem -- is it that Spark used *less*
memory than the 2 GB?
jvm used memory as expected - 512mb
but all the python workers was not bounded by 2GB limit - they grew in RES
size until OOM Killer came itno play

Yes, i can change parallelism level for map and reduce, but think about it
- i can not have one script for every file. I must know in advance its
size, so i can estimate memory comsumption by worker and adjust parallelism
level accordinaly.
What worker memory limit is for?


On Thu, Oct 17, 2013 at 10:15 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Hi there,
>
> I'm not sure I understand your problem -- is it that Spark used *less*
> memory than the 2 GB? That out of memory message seems to be from your
> operating system, so maybe there were other things using RAM on that
> machine, or maybe Linux is configured to kill tasks quickly when the memory
> gets full.
>
> When you're running PySpark, the underlying Spark process is unlikely to
> use a ton of memory unless you cache stuff, because it just pipes data to
> Python. However, it does launch one Python process per core, and those may
> be using a fair amount of RAM. If you'd like to decrease the memory usage
> per process, try changing the reduceByKey(add) in wordcount.py to use more
> reduce tasks by passing a second parameter to it (for example,
> reduceByKey(add, 20) will have it use 20 parallel tasks). Likewise you can
> set a "minimum number of tasks" value on the textFile call; it's 1 by
> default but you can increase it to, say, 100, to make sure that there are
> at least 100 map tasks. This will make the load per task smaller.
>
> Matei
>
> On Oct 15, 2013, at 7:29 AM, eshishki <it...@gmail.com> wrote:
>
>
> Hello,
>
> i setuped spark-0.8.0-incubating-bin-cdh4 on 5 node cluster.
>
> I limited SPARK_WORKER_MEMORY to 2g and there are 4 cores per node, so i
> expected total memory consumption by spark to be 512mb + 2gb.
> Spark webui shows *Memory:* 10.0 GB Total, 0.0 B Used
>
> Then i tried to run simple wordcount.py from examples on a hdfs file,
> which size is 11GB.
> Spark launched 4 workers per node, and did not limited its total size by
> 2gb - top showed RES consumption about 750mb and then
> Out of memory: Kill process 26336 (python) score 97 or sacrifice child
> Killed process 26336, UID 500, (python) total-vm:969696kB,
> anon-rss:782976kB, file-rss:196kB
>
> and in the logs
>
> INFO cluster.ClusterTaskSetManager: Loss was due to
> org.apache.spark.SparkException
> org.apache.spark.SparkException: Python worker exited unexpectedly
> (crashed)
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:167)
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:173)
>         at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:116)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at
> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:193)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
>
> So i could not finished the task. Yes, spark resubmited the task, but it
> was continuing OOM Killed.
>
> Against a smaller file spark was doing good.
>
> So the question is - why spark does not limit its memory accordinaly and
> how to analyze files larger than ram with it?
>
> Thanks.
>
>
>


-- 
Евгений

Re: pyspark memory usage

Posted by Matei Zaharia <ma...@gmail.com>.
Hi there,

I'm not sure I understand your problem -- is it that Spark used *less* memory than the 2 GB? That out of memory message seems to be from your operating system, so maybe there were other things using RAM on that machine, or maybe Linux is configured to kill tasks quickly when the memory gets full.

When you're running PySpark, the underlying Spark process is unlikely to use a ton of memory unless you cache stuff, because it just pipes data to Python. However, it does launch one Python process per core, and those may be using a fair amount of RAM. If you'd like to decrease the memory usage per process, try changing the reduceByKey(add) in wordcount.py to use more reduce tasks by passing a second parameter to it (for example, reduceByKey(add, 20) will have it use 20 parallel tasks). Likewise you can set a "minimum number of tasks" value on the textFile call; it's 1 by default but you can increase it to, say, 100, to make sure that there are at least 100 map tasks. This will make the load per task smaller.

Matei

On Oct 15, 2013, at 7:29 AM, eshishki <it...@gmail.com> wrote:

> 
> Hello,
> 
> i setuped spark-0.8.0-incubating-bin-cdh4 on 5 node cluster. 
> 
> I limited SPARK_WORKER_MEMORY to 2g and there are 4 cores per node, so i expected total memory consumption by spark to be 512mb + 2gb.
> Spark webui shows Memory: 10.0 GB Total, 0.0 B Used
> 
> Then i tried to run simple wordcount.py from examples on a hdfs file, which size is 11GB.
> Spark launched 4 workers per node, and did not limited its total size by 2gb - top showed RES consumption about 750mb and then 
> Out of memory: Kill process 26336 (python) score 97 or sacrifice child
> Killed process 26336, UID 500, (python) total-vm:969696kB, anon-rss:782976kB, file-rss:196kB
> 
> and in the logs
> 
> INFO cluster.ClusterTaskSetManager: Loss was due to org.apache.spark.SparkException
> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
>         at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:167)
>         at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:173)
>         at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:116)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:193)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>         at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 
> So i could not finished the task. Yes, spark resubmited the task, but it was continuing OOM Killed.
> 
> Against a smaller file spark was doing good.
> 
> So the question is - why spark does not limit its memory accordinaly and how to analyze files larger than ram with it?
> 
> Thanks.