You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anubhav Agarwal <an...@gmail.com> on 2015/10/22 21:39:17 UTC

Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

Try this if you have not:-

DataFrame dF = sqlContext.createDataFrame(dRDD, schema).coalesce(3);

synchronized (auditLock) {
    dF.write().mode(SaveMode.Append).parquet("path to folder to save");
}


On Thu, Sep 10, 2015 at 1:53 PM, Umesh Kacha <um...@gmail.com> wrote:

> Hi Yong, it would be great if you can share insight on how to merge small
> small part files generated by Spark job. Regards,
> Umesh
> On Sep 4, 2015 6:03 AM, "java8964" <ja...@hotmail.com> wrote:
>
>> If you use the "spark SQL", then just adjust this configuration "spark.sql.shuffle.partitions"
>> to 9000.
>>
>> Yong
>>
>> ------------------------------
>> From: umesh.kacha@gmail.com
>> Date: Fri, 4 Sep 2015 02:13:40 +0530
>> Subject: Re: How to avoid executor time out on yarn spark while dealing
>> with large shuffle skewed data?
>> To: java8964@hotmail.com
>> CC: m_albert137@yahoo.com; sandy.ryza@cloudera.com; user@spark.apache.org
>>
>> Hi Yong you mentioned you create 9000 partitions how do you do that? I
>> have DataFrame which contains mostly huge data and I want to increase its
>> partitions I want to increase partitions so that few nodes take less time
>> to read shuffle data. In my case there is 5 GB of data one node is
>> processing or reading shuffling. I tried df.coalesce(1000) but it is not
>> seems to be working I dont see 1000 partitions anywhere in Spark UI. I just
>> see 12 tasks which means 12 partitions right? I am using Spark SQL
>> dataframe and using 1.4.0 release. Please guide.
>>
>> On Sun, Aug 30, 2015 at 3:04 PM, Umesh Kacha <um...@gmail.com>
>> wrote:
>>
>> Hi Yong thanks much for the response. How do I merge part-xxxxx files
>> later? Do I need to use hdfs APIs to merge or spark APIs? I can increase
>> shuffle.sql.partition to 300 to reduce shuffling and load within each
>> partition and then merge in the end seems to be good as of now for me.
>> Please guide how do I merge? Thanks much.
>> On Aug 27, 2015 4:18 AM, "java8964" <ja...@hotmail.com> wrote:
>>
>> You should enlarge the "spark.sql.shuffle.partitions", which will make
>> each partition with less data. This is the configuration of how many
>> partitions you will generate.
>>
>> The small output is fine for HDFS, as you can always merge them later,
>> after your job is done.
>>
>> Using SQL has this one problem, you cannot set the partitions dynamically
>> across stages. If you use Scala API, you can control it in the different
>> places.
>>
>> About the skewed data, if your one partition (or one "group by") has too
>> much data to be processed within any reasonable heap size, then you need to
>> redesign your solution.
>>
>> For example, if you group by "country" for the whole population of earth,
>> then "China" or "India" will cause trouble for you, so add "age" as part of
>> the group key, to lower the data per group, then merge/aggregate after
>> that. You have no other solution.
>>
>> Yong
>>
>> ------------------------------
>> From: umesh.kacha@gmail.com
>> Date: Thu, 27 Aug 2015 00:21:32 +0530
>> Subject: Re: How to avoid executor time out on yarn spark while dealing
>> with large shuffle skewed data?
>> To: java8964@hotmail.com
>> CC: m_albert137@yahoo.com; user@spark.apache.org; sandy.ryza@cloudera.com
>>
>> Hi Yong,
>>
>> Thanks much for detail explanation. I already tried  everything you
>> suggested. My problem is hive queries I am using which involves group by in
>> many fields which form lots of shuffling of data since data is skewed and
>> eventually executor hits physical memory limit and YARN kills it. I tried
>> the following approachs all of them ended up with executor hitting memory
>> limit and YARN killed it:
>>
>> -40 executors with 20g memory 4 core each
>> -60 executors with 10g memory 2 core each
>> -30 executors with 30g memory 6/7/8 cores each
>>
>> I think problem is less no of partitions I am using and I have reason to
>> do so. When we call hiveContext.sql("insert into xyz partitions...group by
>> a,b,c,d,e,f,g")
>> by default Spark uses 200 tasks and hence parititions (please correct me
>> if I am wrong). This 200 number can be controlled by
>> spark.sql.shuffle.partitions=200 I have set it as 15 to avoid creating tens
>> of thousands of files at the end of every day my Spark job finishes. If we
>> create so many small files HDFS namenode will hit memory limits since each
>> small files needs around 150 bytes in namenode.
>>
>> So what do you suggest now? My code is very simple I spawn thousand
>> runnable workers using ExecturorService from driver. Each Runnable contains
>> code to execute hiveContext.sql("insert into tablename parititions") I have
>> four such queries. Please guide I am stuck in this problem since last two
>> months Spark job runs half way and fails. I am scheduling job in
>> yarn-client mode as of now once I run it with no problem thinking of moving
>> it into Autosys using yarn-cluster mode.
>>
>> On Wed, Aug 26, 2015 at 4:54 AM, java8964 <ja...@hotmail.com> wrote:
>>
>> You are using Spark 1.4.1, so very latest. The spark webUI gave you lots
>> of information to help you about GC problem.
>>
>> Here are some my experience to share with you:
>>
>> 1) When executor dies as part of job, it is a bad sign something is
>> wrong. Even a new executor will be recreated for retrying, your job's
>> performance will impact a lot, maybe fail eventually. From my experience,
>> executor most likely die due to:
>>     a) JVM OOM
>>     b) Linux kernel out of memory killer
>>     c) It is killed by Yarn
>>  The Spark worker log should give your some information, especially the
>> "Executor processor EXIT code", which indicate the reason why it is dead.
>>
>> 2) You should enable the "GC" output in the JVM of executor, which will
>> generate the GC activities in the sys.log in the executor. If you see very
>> frequently "Full GC", which means your heap is too small for the data to be
>> processed.
>>
>> 3) In the Spark webUI, every task has the information related how many
>> records being assigned to it to process, like following:
>>
>>
>>
>> See above Read Size/Records, I found out this is very useful.
>> This give you an idea average how many records per partition, to be
>> processed by the current task.
>>
>> In the above example, my executor is given 20G heap, with 4 cores, and
>> run without any problem.
>> So I know that avg each task is allocated about 3M records, so 4 cores
>> per executor means 20G heap is fine to process 3M x 4 = 12M records. If I
>> want to try to make it faster, I can try to add one more core in the
>> executor, which maybe run faster, but also maybe cause trouble. You should
>> also check the GC output to see if you have more room in memory.
>>
>> So if your job has GC problem, your options are:
>>
>> 1) Keep in mind different data/record means different memory size. You
>> need to monitor your job, or understand your data.
>> 2) If your current job has problem, then you know that for these amount
>> of data, your setting is not good. you can do:
>>     a) increase your heap size (But I don't like to set it to over 24G,
>> as it may take too long for full GC)
>>     b) create more partitions. In the above example, I created 6000
>> partitions, as I know the total records is about 18B. If I create 9000
>> partitions, then avg each partition will be around 2M records, and it will
>> lower the memory pressure.
>>     c) lower your core. In the above example, if I change the core per
>> executor to 2, then only 2 tasks can be concurrent run within one executor,
>> which means 6M records vs 12M originally, so my memory pressure also will
>> be lower.
>>
>> Spark indeed gives you lots of information related to the performance of
>> your job, you just need to understand them, and make changes based on them.
>>
>> Yong
>>
>> ------------------------------
>> Date: Fri, 21 Aug 2015 20:43:48 +0530
>> Subject: Re: How to avoid executor time out on yarn spark while dealing
>> with large shuffle skewed data?
>> From: umesh.kacha@gmail.com
>> To: m_albert137@yahoo.com
>> CC: user@spark.apache.org; sandy.ryza@cloudera.com
>>
>>
>> Hi Sandy yes you are right most of the time going in GC after finishing
>> around 500 jobs my spark job becomes slower because of GC please see
>> attached image. I am not able to fix this please help me solve this I have
>> been trying to make spark job since last one month it never ran fully yarn
>> kills executor because of GC pause and hence timeout. I am giving
>> sufficient memory to executors why is it spending most time in GC initially
>> when job starts it's incredibly fast when GC starts it becomes slow very
>> slow and things get messy. I have the following config
>> spark.shuffle.memoryFraction is 0.4 spark.storage.memoryFraction is 0.4
>> spark.network.timeout 1000 sec spark.shuffle.file.buffer 128k
>> spark.yarn.executor.overhead 3500 also using kryo serialization I am giving
>> 25 gig and 6 cores to each executor I have 30 such executors. What is wrong
>> I feel like giving up please guide. My code is very simple I have 1000
>> runnable which I invoke in driver and each runnable fires four group by
>> queries using hiveContext.sql my data set is large and it is in orc format
>> so loading orc file into dataframe and doing some processing and finally
>> calling hiveContext.sql do I need to refactore code how do I solve this
>> problem I am sure data shuffling is the problem how do I avoid it as my
>> data is skewed please guide thanks much
>> On Aug 21, 2015 4:24 PM, "Michael Albert" <m_...@yahoo.com> wrote:
>>
>> This is something of a wild guess, but I find that when executors start
>> disappearing
>> for no obvious reason, this is usually because the yarn node-managers
>> have decided
>> that the containers are using too much memory and then terminate the
>> executors.
>>
>> Unfortunately, to see evidence of this, one needs to carefully review the
>> yarn node-manager logs
>> on the workers -- it doesn't seem to show up in the UI.
>>
>> What I generally do is some combination of:
>>    1) increasing executors memory (often also decreasing number of
>> executors)
>>    2) decreasing the number of cores per executor
>>    3) increase the executor memory overhead.
>>
>> Good luck.
>>
>> -Mike
>>
>> ------------------------------
>> *From:* Sandy Ryza <sa...@cloudera.com>
>> *To:* Umesh Kacha <um...@gmail.com>
>> *Cc:* "user@spark.apache.org" <us...@spark.apache.org>
>> *Sent:* Thursday, August 20, 2015 5:21 PM
>> *Subject:* Re: How to avoid executor time out on yarn spark while
>> dealing with large shuffle skewed data?
>>
>> GC wouldn't necessarily result in errors - it could just be slowing down
>> your job and causing the executor JVMs to stall.  If you click on a stage
>> in the UI, you should end up on a page with all the metrics concerning the
>> tasks that ran in that stage.  "GC Time" is one of these task metrics.
>>
>> -Sandy
>>
>>
>>
>> On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha <um...@gmail.com>
>> wrote:
>>
>> Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead
>> as 3500 which seems to be good enough I believe. So you mean only GC could
>> be the reason behind timeout I checked Yarn logs I did not see any GC error
>> there. Please guide. Thanks much.
>>
>> On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza <sa...@cloudera.com>
>> wrote:
>>
>> Moving this back onto user@
>>
>> Regarding GC, can you look in the web UI and see whether the "GC time"
>> metric dominates the amount of time spent on each task (or at least the
>> tasks that aren't completing)?
>>
>> Also, have you tried bumping your spark.yarn.executor.memoryOverhead?
>> YARN may be killing your executors for using too much off-heap space.  You
>> can see whether this is happening by looking in the Spark AM or YARN
>> NodeManager logs.
>>
>> -Sandy
>>
>> On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha <um...@gmail.com>
>> wrote:
>>
>> Hi thanks much for the response. Yes I tried default settings too 0.2 it
>> was also going into timeout if it is spending time in GC then why it is not
>> throwing GC error I don't see any such error. Yarn logs are not helpful at
>> all. What is tungsten how do I use it? Spark is doing great I believe my
>> job runs successfully and 60% tasks completes only after first executor
>> gets lost things are messing.
>> On Aug 20, 2015 7:59 PM, "Sandy Ryza" <sa...@cloudera.com> wrote:
>>
>> What sounds most likely is that you're hitting heavy garbage collection.
>> Did you hit issues when the shuffle memory fraction was at its default of
>> 0.2?  A potential danger with setting the shuffle storage to 0.7 is that it
>> allows shuffle objects to get into the GC old generation, which triggers
>> more stop-the-world garbage collections.
>>
>> Have you tried enabling Tungsten / unsafe?
>>
>> Unfortunately, Spark is still not that great at dealing with
>> heavily-skewed shuffle data, because its reduce-side aggregation still
>> operates on Java objects instead of binary data.
>>
>> -Sandy
>>
>> On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha <um...@gmail.com>
>> wrote:
>>
>> Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have
>> set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries
>> executed using hiveContext.sql my data set is skewed so will be more
>> shuffling I believe I don't know what's wrong spark job runs fine for
>> almost an hour and when shuffle read shuffle write column in UI starts to
>> show more than 10 gb executor starts to getting lost because of timeout and
>> slowly other executor starts getting lost. Please guide.
>> On Aug 20, 2015 7:38 PM, "Sandy Ryza" <sa...@cloudera.com> wrote:
>>
>> What version of Spark are you using?  Have you set any shuffle configs?
>>
>> On Wed, Aug 19, 2015 at 11:46 AM, unk1102 <um...@gmail.com> wrote:
>>
>> I have one Spark job which seems to run fine but after one hour or so
>> executor start getting lost because of time out something like the
>> following
>> error
>>
>> cluster.yarnScheduler : Removing an executor 14 650000 timeout exceeds
>> 600000 seconds
>>
>> and because of above error couple of chained errors starts to come like
>> FetchFailedException, Rpc client disassociated, Connection reset by peer,
>> IOException etc
>>
>> Please see the following UI page I have noticed when shuffle read/write
>> starts to increase more than 10 GB executors starts getting lost because
>> of
>> timeout. How do I clear this stacked memory of 10 GB in shuffle read/write
>> section I dont cache anything why Spark is not clearing those memory.
>> Please
>> guide.
>>
>> IMG_20150819_231418358.jpg
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
>> >
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --------------------------------------------------------------------- To
>> unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional
>> commands, e-mail: user-help@spark.apache.org
>>
>>
>>
>>