You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nikolaos Tsipas (JIRA)" <ji...@apache.org> on 2017/09/06 13:56:01 UTC

[jira] [Commented] (SPARK-21935) Pyspark UDF causing ExecutorLostFailure

    [ https://issues.apache.org/jira/browse/SPARK-21935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155378#comment-16155378 ] 

Nikolaos Tsipas commented on SPARK-21935:
-----------------------------------------

Hmm.. I think I understand it a bit better now after some reading about how pyspark works. 

The problem is that the amount of memory used by python on the executor is separate from the amount of memory used by the JVM. However, all the calculations in spark-defaults.conf are based on the assumption that only JVM will be consuming memory on the executor/YARN container. 

Would the right approach then be to minimise the amount of memory used by the JVM in the container and leave the container memory available to python?

> Pyspark UDF causing ExecutorLostFailure 
> ----------------------------------------
>
>                 Key: SPARK-21935
>                 URL: https://issues.apache.org/jira/browse/SPARK-21935
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.1.0
>            Reporter: Nikolaos Tsipas
>              Labels: pyspark, udf
>         Attachments: cpu.png, Screen Shot 2017-09-06 at 11.30.28.png, Screen Shot 2017-09-06 at 11.31.13.png, Screen Shot 2017-09-06 at 11.31.31.png
>
>
> Hi,
> I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as follows:
> {code}
> from pyspark.sql.functions import col, udf
> from pyspark.sql.types import StringType
> path = 's3://some/parquet/dir/myfile.parquet'
> df = spark.read.load(path)
> def _test_udf(useragent):
>     return useragent.upper()
> test_udf = udf(_test_udf, StringType())
> df = df.withColumn('test_field', test_udf(col('std_useragent')))
> df.write.parquet('/output.parquet')
> {code}
> The following config is used in {{spark-defaults.conf}} (using {{maximizeResourceAllocation}} in EMR)
> {code}
> ...
> spark.executor.instances         4
> spark.executor.cores             8
> spark.driver.memory              8G
> spark.executor.memory            9658M
> spark.default.parallelism        64
> spark.driver.maxResultSize       3G
> ...
> {code}
> The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, 15 GiB memory, 160 SSD GB storage
> The above example fails every single time with errors like the following:
> {code}
> 17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, ip-172-31-7-125.eu-west-1.compute.internal, executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> {code}
> I tried to increase the  {{spark.yarn.executor.memoryOverhead}} to 3000 which delays the errors but eventually I get them before the end of the job. The job eventually fails.
> !Screen Shot 2017-09-06 at 11.31.31.png|width=800!
> If I run the above job in scala everything works as expected (without having to adjust the memoryOverhead)
> {code}
> import org.apache.spark.sql.functions.udf
> val upper: String => String = _.toUpperCase
> val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
> val upperUDF = udf(upper)
> val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
> newdf.write.parquet("/output.parquet")
> {code}
> !Screen Shot 2017-09-06 at 11.31.13.png|width=800!
> Cpu utilisation is very bad with pyspark
> !cpu.png|width=800!
> Is this a known bug with pyspark and udfs or is it a matter of bad configuration? 
> Looking forward to suggestions. Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org