You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/09/16 12:27:02 UTC

[jira] [Resolved] (SPARK-22023) Multi-column Spark SQL UDFs broken in Python 3

     [ https://issues.apache.org/jira/browse/SPARK-22023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22023.
----------------------------------
    Resolution: Invalid

This does not work in master / 2.1.0 with Python 2.7.x in my local.

I think the error looks legitimate because you defined a lambda that takes {{x}} alone but two arguments {{'a', 'b'}} are given. It should be {{sum_udf = udf(lambda x, y: x + x, LongType())}}.

> Multi-column Spark SQL UDFs broken in Python 3
> ----------------------------------------------
>
>                 Key: SPARK-22023
>                 URL: https://issues.apache.org/jira/browse/SPARK-22023
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.0
>         Environment: Python3
>            Reporter: Oli Hall
>
> I've been testing some existing PySpark code after migrating to Python3, and there seems to be an issue with multi-column UDFs in Spark SQL. Essentially, any UDF that takes in more than one column as input fails with an error relating to expansion of an underlying lambda expression:
> {code}
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
>     process()
>   File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 237, in dump_stream
>     self.serializer.dump_stream(self._batched(iterator), stream)
>   File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
>     for obj in iterator:
>   File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 226, in _batched
>     for item in iterator:
>   File "<string>", line 1, in <lambda>
>   File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
>     return lambda *a: f(*a)
> TypeError: <lambda>() takes 1 positional argument but 2 were given
> 	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
> 	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
> 	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
> 	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
> 	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:108)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> This syntax should (and does) work in Python 2, but is not valid in Python 3, I believe.
> I have a minimal example that reproduces the error, running in the PySpark shell, with Python 3.6.2, Spark 2.2:
> {code}
> >>> from pyspark.sql.functions import udf
> >>> from pyspark.sql.types import LongType
> >>> 
> >>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, 'b': 2}]))
> /Users/oli-hall/Documents/Code/spark2/python/pyspark/sql/session.py:351: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
>   warnings.warn("Using RDD of dict to inferSchema is deprecated. "
> >>> df.printSchema()
> root
>  |-- a: long (nullable = true)
>  |-- b: long (nullable = true)
> >>> sum_udf = udf(lambda x: x[0] + x[1], LongType())
> >>> 
> >>> with_sum = df.withColumn('sum', sum_udf('a', 'b'))
> >>> 
> >>> with_sum.first()
> 17/09/15 11:43:56 ERROR executor.Executor: Exception in task 2.0 in stage 3.0 (TID 8)
> ... (error snipped)
> TypeError: <lambda>() takes 1 positional argument but 2 were given
> {code}
> I've managed to work around it for now, by pushing the input columns into a struct, then modifying the UDF to read from the struct, but it'd be good if I could use multi-column input again.
> Workaround:
> {code}
> >>> from pyspark.sql.functions import udf, struct
> >>> from pyspark.sql.types import LongType
> >>> 
> >>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, 'b': 2}]))
> >>> 
> >>> sum_udf = udf(lambda x: x.a + x.b, LongType())
> >>> 
> >>> with_sum = df.withColumn('temp_struct', struct('a', 'b')).withColumn('sum', sum_udf('temp_struct'))
> >>> with_sum.first()
> Row(a=1, b=1, temp_struct=Row(a=1, b=1), sum=2)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org