You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yang Cao <cy...@gmail.com> on 2017/02/16 07:18:59 UTC

Re: physical memory usage keep increasing for spark app on Yarn

Hi Pavel!

Sorry for late. I just do some investigation in these days with my colleague. Here is my thought: from spark 1.2, we use Netty with off-heap memory to reduce GC during shuffle and cache block transfer. In my case, if I try to increase the memory overhead enough. I will get the Max direct buffer exception. When Netty do block transferring, there will be five threads by default to grab the data chunk to target executor. In my situation, one single chunk is too big to fit into the buffer. So gc won’t help here. My final solution is to do another repartition before the repartition(1). Just to make 10x times more partitions than original’s. In this way, I can reduce the size of each chunk Netty transfer. 

Also I want to say that it’s not a good choice to repartition a big dataset into single file. This extremely unbalanced scenario is kind of waste your compute resources. 

I don’t know whether my explanation is right. Plz correct me if you find any issue.THX

Best,
Yang
>  On 2017年1月23日, at 18:03, Pavel Plotnikov <pa...@team.wrike.com> wrote:
> 
> Hi Yang!
> 
> I don't know exactly why this happen, but i think GC can't work to fast enough or size of data with additional objects created while computations to big for executor. 
> And i found that this problem only if you make some data manipulations. You can cache you data first, after that, write in one partiton.
> For example  
> val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.cache()
> or
> dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath)
> val dropDF = spark.read.parquet(temppath)
> and then
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> Best,
> 
> On Sun, Jan 22, 2017 at 12:31 PM Yang Cao <cybeater@gmail.com <ma...@gmail.com>> wrote:
> Also, do you know why this happen? 
> 
>> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotnikov@team.wrike.com <ma...@team.wrike.com>> wrote:
>> 
> 
>> Hi Yang,
>> i have faced with the same problem on Mesos and to circumvent this issue i am usually increase partition number. On last step in your code you reduce number of partitions to 1, try to set bigger value, may be it solve this problem.
>> 
>> Cheers,
>> Pavel
>> 
>> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybeater@gmail.com <ma...@gmail.com>> wrote:
>> Hi all,
>> 
>> I am running a spark application on YARN-client mode with 6 executors (each 4 cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get killed by node manager; and give out the info that tells me to boost spark.yarn.excutor.memoryOverhead. I know that this param mainly control the size of memory allocated off-heap. But I don’t know when and how the spark engine will use this part of memory. Also increase that part of memory not always solve my problem. sometimes works sometimes not. It trends to be useless when the input data is large.
>> 
>> FYI, my app’s logic is quite simple. It means to combine the small files generated in one single day (one directory one day) into a single one and write back to hdfs. Here is the core code:
>> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = ${ts.day}").coalesce(400)
>> val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>> The source file may have hundreds to thousands level’s partition. And the total parquet file is around 1to 5 gigs. Also I find that in the step that shuffle reading data from different machines, The size of shuffle read is about 4 times larger than the input size, Which is wired or some principle I don’t know. 
>> 
>> Anyway, I have done some search myself for this problem. Some article said that it’s on the direct buffer memory (I don’t set myself). Some article said that people solve it with more frequent full GC. Also I find one people on SO with very similar situation: http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
>> This guy claimed that it’s a bug with parquet but comment questioned him. People in this mail list may also receive an email hours ago from blondowski who described this problem while writing json: http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none <http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none>
>> 
>> So it looks like to be common question for different output format. I hope someone with experience about this problem could make an explanation about this issue. Why this happen and what is a reliable way to solve this problem. 
>> 
>> Best,
>> 
>>