You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ameet Kini <am...@gmail.com> on 2014/04/10 04:48:06 UTC

shuffle memory requirements

val hrdd = sc.hadoopRDD(..)
val res =
hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
some code to save those partitions )

I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
custom partitioner generates over 20,000 partitions, so there are 20,000
tasks reading the shuffle files. On problems with low partitions (~ 1000),
the job completes successfully.

On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
cores per executor and brought it down to 3, and I still get
OutOfMemoryErrors at 20,000 partitions. I have
spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
I am not caching any RDDs.

Do those config params look reasonable for my shuffle size ? I'm not sure
what to increase - shuffle.memoryFraction or the memory that the reduce
tasks get. The latter I am guessing is whatever is left after giving
storage.memoryFraction and shuffle.memoryFraction.

Thanks,
Ameet

Re: shuffle memory requirements

Posted by Andrew Ash <an...@andrewash.com>.
Hi Maddenpj,

Right now the best estimate I've heard for the open file limit is that
you'll need the square of the largest partition count in your dataset.

I filed a ticket to log the ulimit value when it's too low at
https://issues.apache.org/jira/browse/SPARK-3750

On Mon, Sep 29, 2014 at 6:20 PM, maddenpj <ma...@gmail.com> wrote:

> Hey Ameet,
>
> Thanks for the info, I'm running into the same issue myself and my last
> attempt crashed and my ulimit was 16834. I'm going to up it and try again,
> but yea I would like to know the best practice for computing this. Can you
> talk about the worker nodes, what are their specs? At least 45 gigs of
> memory and 6 cores?
>
> Also I left my worker at the default memory size (512m I think) and gave
> all
> of the memory to the executor. It was my understanding that the worker just
> spawns the executor but all the work is done in the executor. What was your
> reasoning for using 24G on the worker?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/shuffle-memory-requirements-tp4048p15375.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: shuffle memory requirements

Posted by maddenpj <ma...@gmail.com>.
Hey Ameet,

Thanks for the info, I'm running into the same issue myself and my last
attempt crashed and my ulimit was 16834. I'm going to up it and try again,
but yea I would like to know the best practice for computing this. Can you
talk about the worker nodes, what are their specs? At least 45 gigs of
memory and 6 cores?

Also I left my worker at the default memory size (512m I think) and gave all
of the memory to the executor. It was my understanding that the worker just
spawns the executor but all the work is done in the executor. What was your
reasoning for using 24G on the worker?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/shuffle-memory-requirements-tp4048p15375.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: shuffle memory requirements

Posted by Ameet Kini <am...@gmail.com>.
A typo - I mean't section 2.1.2.5 "ulimit and nproc" of
https://hbase.apache.org/book.html

Ameet


On Fri, Apr 11, 2014 at 10:32 AM, Ameet Kini <am...@gmail.com> wrote:

>
> Turns out that my ulimit settings were too low. I bumped  up and the job
> successfully completes. Here's what I have now:
>
> $ ulimit -u   // for max user processes
> 81920
> $ ulimit -n  // for open files
> 81920
>
> I was thrown off by the OutOfMemoryError into thinking it is Spark running
> out of memory in the shuffle stage. My previous settings were 1024 for
> both, and while that worked for shuffle on small jobs (10s of gigs), it'd
> choke on the large ones. It would be good to document these in the tuning /
> configuration section. Something like section 2.5 "ulimit and nproc" of
> https://hbase.apache.org/book.html
>
>
> 14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception
> java.lang.OutOfMemoryError: unable to create new native thread
>     at java.lang.Thread.start0(Native Method)
>     at java.lang.Thread.start(Thread.java:657)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> maxBytesInFlight: 50331648, minRequest: 10066329
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> maxBytesInFlight: 50331648, minRequest: 10066329
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> Getting 0 non-zero-bytes blocks out of 7773 blocks
> 14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> Started 0 remote gets in  1 ms
> 14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3)
> 14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756
> java.io.IOException: DFSOutputStream is closed
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694)
>     at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778)
>     at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66)
>     at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99)
>     at
> org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240)
>     at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300)
>     at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189)
>     at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176)
>     at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
>     at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>     at org.apache.spark.scheduler.Task.run(Task.scala:53)
>     at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:416)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>     at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:679)
>
> Thanks,
> Ameet
>
>
> On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini <am...@gmail.com> wrote:
>
>> val hrdd = sc.hadoopRDD(..)
>> val res =
>> hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
>> some code to save those partitions )
>>
>> I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
>> custom partitioner generates over 20,000 partitions, so there are 20,000
>> tasks reading the shuffle files. On problems with low partitions (~ 1000),
>> the job completes successfully.
>>
>> On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
>> each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
>> cores per executor and brought it down to 3, and I still get
>> OutOfMemoryErrors at 20,000 partitions. I have
>> spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
>> I am not caching any RDDs.
>>
>> Do those config params look reasonable for my shuffle size ? I'm not sure
>> what to increase - shuffle.memoryFraction or the memory that the reduce
>> tasks get. The latter I am guessing is whatever is left after giving
>> storage.memoryFraction and shuffle.memoryFraction.
>>
>> Thanks,
>> Ameet
>>
>>
>>
>

Re: shuffle memory requirements

Posted by Ameet Kini <am...@gmail.com>.
Turns out that my ulimit settings were too low. I bumped  up and the job
successfully completes. Here's what I have now:

$ ulimit -u   // for max user processes
81920
$ ulimit -n  // for open files
81920

I was thrown off by the OutOfMemoryError into thinking it is Spark running
out of memory in the shuffle stage. My previous settings were 1024 for
both, and while that worked for shuffle on small jobs (10s of gigs), it'd
choke on the large ones. It would be good to document these in the tuning /
configuration section. Something like section 2.5 "ulimit and nproc" of
https://hbase.apache.org/book.html


14/04/10 15:16:58 WARN DFSClient: DataStreamer Exception
java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:657)
    at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.initDataStreaming(DFSOutputStream.java:408)
    at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:488)
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, minRequest: 10066329
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, minRequest: 10066329
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 0 non-zero-bytes blocks out of 7773 blocks
14/04/10 15:16:58 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote gets in  1 ms
14/04/10 15:16:58 INFO Ingest: Working on partition 6215 with rep = (3, 3)
14/04/10 15:16:58 ERROR Executor: Exception in task ID 21756
java.io.IOException: DFSOutputStream is closed
    at
org.apache.hadoop.hdfs.DFSOutputStream.isClosed(DFSOutputStream.java:1265)
    at
org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:1715)
    at
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:1694)
    at
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1778)
    at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:66)
    at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:99)
    at
org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1240)
    at org.apache.hadoop.io.MapFile$Writer.close(MapFile.java:300)
    at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:189)
    at geotrellis.spark.cmd.Ingest$$anonfun$4.apply(Ingest.scala:176)
    at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
    at org.apache.spark.rdd.RDD$$anonfun$2.apply(RDD.scala:466)
    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:679)

Thanks,
Ameet


On Wed, Apr 9, 2014 at 10:48 PM, Ameet Kini <am...@gmail.com> wrote:

> val hrdd = sc.hadoopRDD(..)
> val res =
> hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
> some code to save those partitions )
>
> I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
> custom partitioner generates over 20,000 partitions, so there are 20,000
> tasks reading the shuffle files. On problems with low partitions (~ 1000),
> the job completes successfully.
>
> On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
> each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
> cores per executor and brought it down to 3, and I still get
> OutOfMemoryErrors at 20,000 partitions. I have
> spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
> I am not caching any RDDs.
>
> Do those config params look reasonable for my shuffle size ? I'm not sure
> what to increase - shuffle.memoryFraction or the memory that the reduce
> tasks get. The latter I am guessing is whatever is left after giving
> storage.memoryFraction and shuffle.memoryFraction.
>
> Thanks,
> Ameet
>
>
>