You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sergey Zhemzhitsky (JIRA)" <ji...@apache.org> on 2018/11/19 09:30:00 UTC

[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

     [ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sergey Zhemzhitsky updated SPARK-26114:
---------------------------------------
    Attachment: run1-noparams-dominator-tree.png

> Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26114
>                 URL: https://issues.apache.org/jira/browse/SPARK-26114
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.2, 2.3.2, 2.4.0
>         Environment: Spark 3.0.0-SNAPSHOT (master branch)
> Scala 2.11
> Yarn 2.7
>            Reporter: Sergey Zhemzhitsky
>            Priority: Major
>         Attachments: run1-noparams-dominator-tree.png
>
>
> Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_
> The error happens when trying specify pretty small number of partitions in _coalesce_ call.
> *How to reproduce?*
> # Start spark-shell
> {code:bash}
> spark-shell \ 
>   --num-executors=5 \ 
>   --executor-cores=2 \ 
>   --master=yarn \
>   --deploy-mode=client \ 
>   --conf spark.executor.memory=1g \ 
>   --conf spark.dynamicAllocation.enabled=false \
>   --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
> {code}
> Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now.
> Also note that the total number of cores allocated for job is 5x2=10
> # Then generate some test data
> {code:scala}
> import org.apache.hadoop.io._ 
> import org.apache.hadoop.io.compress._ 
> import org.apache.commons.lang._ 
> import org.apache.spark._ 
> // generate 100M records of sample data 
> sc.makeRDD(1 to 1000, 1000) 
>   .flatMap(item => (1 to 100000) 
>     .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) 
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
> {code}
> # Run the sample job
> {code:scala}
> import org.apache.hadoop.io._
> import org.apache.spark._
> import org.apache.spark.storage._
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
> rdd 
>   .map(item => item._1.toString -> item._2.toString) 
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
>   .coalesce(10,false) 
>   .count 
> {code}
> Note that the number of partitions is equal to the total number of cores allocated to the job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org