You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Oli Hall (JIRA)" <ji...@apache.org> on 2017/09/15 11:49:00 UTC

[jira] [Created] (SPARK-22023) Multi-column Spark SQL UDFs broken in Python 3

Oli Hall created SPARK-22023:
--------------------------------

             Summary: Multi-column Spark SQL UDFs broken in Python 3
                 Key: SPARK-22023
                 URL: https://issues.apache.org/jira/browse/SPARK-22023
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.2.0
         Environment: Python3
            Reporter: Oli Hall


I've been testing some existing PySpark code after migrating to Python3, and there seems to be an issue with multi-column UDFs in Spark SQL. Essentially, any UDF that takes in more than one column as input fails with an error relating to expansion of an underlying lambda expression:

{code:bash}
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 237, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/serializers.py", line 226, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "<my $SPARK_HOME>/python/lib/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
    return lambda *a: f(*a)
TypeError: <lambda>() takes 1 positional argument but 2 were given

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

This syntax should (and does) work in Python 2, but is not valid in Python 3, I believe.

I have a minimal example that reproduces the error, running in the PySpark shell, with Python 3.6.2, Spark 2.2:

{code:python}
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import LongType
>>> 
>>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, 'b': 2}]))
/Users/oli-hall/Documents/Code/spark2/python/pyspark/sql/session.py:351: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "
>>> df.printSchema()
root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)

>>> sum_udf = udf(lambda x: x[0] + x[1], LongType())
>>> 
>>> with_sum = df.withColumn('sum', sum_udf('a', 'b'))
>>> 
>>> with_sum.first()
17/09/15 11:43:56 ERROR executor.Executor: Exception in task 2.0 in stage 3.0 (TID 8)
...
TypeError: <lambda>() takes 1 positional argument but 2 were given
{code}

I've managed to work around it for now, by pushing the input columns into a struct, then modifying the UDF to read from the struct, but it'd be good if I could use multi-column input again.

Workaround:
{code:python}
>>> from pyspark.sql.functions import udf, struct
>>> from pyspark.sql.types import LongType
>>> 
>>> df = spark.createDataFrame(sc.parallelize([{'a': 1, 'b': 1}, {'a': 2, 'b': 2}]))
>>> 
>>> sum_udf = udf(lambda x: x.a + x.b, LongType())
>>> 
>>> with_sum = df.withColumn('temp_struct', struct('a', 'b')).withColumn('sum', sum_udf('temp_struct'))
>>> with_sum.first()
Row(a=1, b=1, temp_struct=Row(a=1, b=1), sum=2)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org