You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nikolaos Tsipas (JIRA)" <ji...@apache.org> on 2017/09/06 10:46:00 UTC
[jira] [Created] (SPARK-21935) Pyspark UDF causing
ExecutorLostFailure
Nikolaos Tsipas created SPARK-21935:
---------------------------------------
Summary: Pyspark UDF causing ExecutorLostFailure
Key: SPARK-21935
URL: https://issues.apache.org/jira/browse/SPARK-21935
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 2.1.0
Reporter: Nikolaos Tsipas
Hi,
I'm using spark 2.1.0 on AWS EMR and trying to use a UDF in python as follows:
{code}
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
path = 's3://some/parquet/dir/myfile.parquet'
df = spark.read.load(path)
def _test_udf(useragent):
return useragent.upper()
test_udf = udf(_test_udf, StringType())
df = df.withColumn('test_field', test_udf(col('std_useragent')))
df.write.parquet('/output.parquet')
{code}
The following config is used in {{spark-defaults.conf}} (using {{maximizeResourceAllocation}} in EMR)
{code}
...
spark.executor.instances 4
spark.executor.cores 8
spark.driver.memory 8G
spark.executor.memory 9658M
spark.default.parallelism 64
spark.driver.maxResultSize 3G
...
{code}
The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, 15 GiB memory, 160 SSD GB storage
The above example fails every single time with errors like the following:
{code}
17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, ip-172-31-7-125.eu-west-1.compute.internal, executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
{code}
I tried to increase the {{spark.yarn.executor.memoryOverhead}} to 3000 which delays the errors but eventually I get them before the end of the job. The job eventually fails.
<image>
If I run the above job in scala everything works as expected (without having to adjust the memoryOverhead)
{code}
import org.apache.spark.sql.functions.udf
val upper: String => String = _.toUpperCase
val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
val upperUDF = udf(upper)
val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
newdf.write.parquet("/output.parquet")
{code}
<image>
Cpu utilisation is very bad with pyspark
<image>
Is this a known bug with pyspark and udfs or is it a matter of bad configuration?
Looking forward to suggestions. Thanks!
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org