You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Oli Hall (JIRA)" <ji...@apache.org> on 2017/09/15 11:49:00 UTC
[jira] [Created] (SPARK-22023) Multi-column Spark SQL UDFs broken
in Python 3
Oli Hall created SPARK-22023:
--------------------------------
Summary: Multi-column Spark SQL UDFs broken in Python 3
Key: SPARK-22023
URL: https://issues.apache.org/jira/browse/SPARK-22023
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 2.2.0
Environment: Python3
Reporter: Oli Hall
I've been testing some existing PySpark code after migrating to Python3, and there seems to be an issue with multi-column UDFs in Spark SQL. Essentially, any UDF that takes in more than one column as input fails with an error relating to expansion of an underlying lambda expression:
{code:bash}
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 237, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
for obj in iterator:
File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 226, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
return lambda *a: f(*a)
TypeError: <lambda>() takes 1 positional argument but 2 were given
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
This syntax should (and does) work in Python 2, but is not valid in Python 3, I believe.
I have a minimal example that reproduces the error, running in the PySpark shell, with Python 3.6.2, Spark 2.2:
{code:python}
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import LongType
>>>
>>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, 'b': 2}]))
/Users/oli-hall/Documents/Code/spark2/python/pyspark/sql/session.py:351: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
warnings.warn("Using RDD of dict to inferSchema is deprecated. "
>>> df.printSchema()
root
|-- a: long (nullable = true)
|-- b: long (nullable = true)
>>> sum_udf = udf(lambda x: x[0] + x[1], LongType())
>>>
>>> with_sum = df.withColumn('sum', sum_udf('a', 'b'))
>>>
>>> with_sum.first()
17/09/15 11:43:56 ERROR executor.Executor: Exception in task 2.0 in stage 3.0 (TID 8)
...
TypeError: <lambda>() takes 1 positional argument but 2 were given
{code}
I've managed to work around it for now, by pushing the input columns into a struct, then modifying the UDF to read from the struct, but it'd be good if I could use multi-column input again.
Workaround:
{code:python}
>>> from pyspark.sql.functions import udf, struct
>>> from pyspark.sql.types import LongType
>>>
>>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, 'b': 2}]))
>>>
>>> sum_udf = udf(lambda x: x.a + x.b, LongType())
>>>
>>> with_sum = df.withColumn('temp_struct', struct('a', 'b')).withColumn('sum', sum_udf('temp_struct'))
>>> with_sum.first()
Row(a=1, b=1, temp_struct=Row(a=1, b=1), sum=2)
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org