You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nick Chammas <ni...@gmail.com> on 2017/06/22 16:08:17 UTC
Trouble with PySpark UDFs and SPARK_HOME only on EMR
I’m seeing a strange issue on EMR which I posted about here
<https://forums.aws.amazon.com/thread.jspa?threadID=248805&tstart=0&messageID=790954#790954>
.
In brief, when I try to import a UDF I’ve defined, Python somehow fails to
find Spark. This exact code works for me locally and works on our
on-premises CDH cluster under YARN.
This is the traceback:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",
line 1133, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py",
line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o89.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3
in stage 0.0 (TID 3, ip-10-97-35-12.ec2.internal, executor 1):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py",
line 161, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py",
line 91, in read_udfs
_, udf = read_single_udf(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py",
line 78, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py",
line 54, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/serializers.py",
line 169, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/serializers.py",
line 451, in loads
return pickle.loads(obj, encoding=encoding)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/splinkr/person.py",
line 7, in <module>
from splinkr.util import repartition_to_size
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/splinkr/util.py",
line 34, in <module>
containsNull=False,
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py",
line 1872, in udf
return UserDefinedFunction(f, returnType)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py",
line 1830, in __init__
self._judf = self._create_judf(name)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py",
line 1834, in _create_judf
sc = SparkContext.getOrCreate()
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py",
line 310, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py",
line 115, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py",
line 259, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/java_gateway.py",
line 77, in launch_gateway
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
File "/usr/lib64/python3.5/subprocess.py", line 950, in __init__
restore_signals, start_new_session)
File "/usr/lib64/python3.5/subprocess.py", line 1544, in _execute_child
raise child_exception_type(errno_num, err_msg)
FileNotFoundError: [Errno 2] No such file or directory: './bin/spark-submit'
Does anyone have clues about what might be going on?
Nick
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-PySpark-UDFs-and-SPARK-HOME-only-on-EMR-tp28778.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Trouble with PySpark UDFs and SPARK_HOME only on EMR
Posted by Nicholas Chammas <ni...@gmail.com>.
Here’s a repro for a very similar issue where Spark hangs on the UDF, which
I think is related to the SPARK_HOME issue. I posted the repro on the EMR
forum <https://forums.aws.amazon.com/thread.jspa?messageID=791019󁇫>,
but in case you can’t access it:
1. I’m running EMR 5.6.0, Spark 2.1.1, and Python 3.5.1.
2. Create a simple Python package by creating a directory called udftest.
3. Inside udftest put an empty __init__.py and a nothing.py.
4.
nothing.py should have the following contents:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def do_nothing(s: int) -> int:
return s
do_nothing_udf = udf(do_nothing, IntegerType())
5.
From your home directory (the one that contains your udftest package),
create a ZIP that we will ship to YARN.
pushd udftest/
zip -rq ../udftest.zip *
popd
6.
Start a PySpark shell with our test package.
export PYSPARK_PYTHON=python3
pyspark \
--conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=$PYSPARK_PYTHON" \
--archives "udftest.zip#udftest"
7.
Now try to use the UDF. It will hang.
from udftest.nothing import do_nothing_udf
spark.range(10).select(do_nothing_udf('id')).show() # hangs
8.
The strange thing is, if you define the exact same UDF directly in the
active PySpark shell, it works fine! It’s only when you import it from a
user-defined module that you see this issue.
On Thu, Jun 22, 2017 at 12:08 PM Nick Chammas <ni...@gmail.com>
wrote:
> I’m seeing a strange issue on EMR which I posted about here
> <https://forums.aws.amazon.com/thread.jspa?threadID=248805&tstart=0&messageID=790954#790954>
> .
>
> In brief, when I try to import a UDF I’ve defined, Python somehow fails to
> find Spark. This exact code works for me locally and works on our
> on-premises CDH cluster under YARN.
>
> This is the traceback:
>
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 318, in show
> print(self._jdf.showString(n, 20))
> File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
> File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
> return f(*a, **kw)
> File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o89.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-97-35-12.ec2.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", line 161, in main
> func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", line 91, in read_udfs
> _, udf = read_single_udf(pickleSer, infile)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", line 78, in read_single_udf
> f, return_type = read_command(pickleSer, infile)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/worker.py", line 54, in read_command
> command = serializer._read_with_length(file)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
> return self.loads(obj)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/serializers.py", line 451, in loads
> return pickle.loads(obj, encoding=encoding)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/splinkr/person.py", line 7, in <module>
> from splinkr.util import repartition_to_size
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/splinkr/util.py", line 34, in <module>
> containsNull=False,
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py", line 1872, in udf
> return UserDefinedFunction(f, returnType)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py", line 1830, in __init__
> self._judf = self._create_judf(name)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/sql/functions.py", line 1834, in _create_judf
> sc = SparkContext.getOrCreate()
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py", line 310, in getOrCreate
> SparkContext(conf=conf or SparkConf())
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py", line 115, in __init__
> SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/context.py", line 259, in _ensure_initialized
> SparkContext._gateway = gateway or launch_gateway(conf)
> File "/mnt/yarn/usercache/hadoop/appcache/application_1498141399866_0005/container_1498141399866_0005_01_000002/pyspark.zip/pyspark/java_gateway.py", line 77, in launch_gateway
> proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
> File "/usr/lib64/python3.5/subprocess.py", line 950, in __init__
> restore_signals, start_new_session)
> File "/usr/lib64/python3.5/subprocess.py", line 1544, in _execute_child
> raise child_exception_type(errno_num, err_msg)
> FileNotFoundError: [Errno 2] No such file or directory: './bin/spark-submit'
>
> Does anyone have clues about what might be going on?
>
> Nick
>
>
> ------------------------------
> View this message in context: Trouble with PySpark UDFs and SPARK_HOME
> only on EMR
> <http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-PySpark-UDFs-and-SPARK-HOME-only-on-EMR-tp28778.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>