You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Alexander Tronchin-James (Jira)" <ji...@apache.org> on 2020/03/06 19:19:00 UTC

[jira] [Commented] (SPARK-29367) pandas udf not working with latest pyarrow release (0.15.0)

    [ https://issues.apache.org/jira/browse/SPARK-29367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053715#comment-17053715 ] 

Alexander Tronchin-James commented on SPARK-29367:
--------------------------------------------------

The fix suggested above to add ARROW_PRE_0_15_IPC_FORMAT=1 in SPARK_HOME/conf/spark-env.sh did NOT resolve the issue for me.

Instead, I needed to set this environment variable in my python environment as described here: https://stackoverflow.com/questions/58269115/how-to-enable-apache-arrow-in-pyspark/58273294#58273294

> pandas udf not working with latest pyarrow release (0.15.0)
> -----------------------------------------------------------
>
>                 Key: SPARK-29367
>                 URL: https://issues.apache.org/jira/browse/SPARK-29367
>             Project: Spark
>          Issue Type: Documentation
>          Components: PySpark
>    Affects Versions: 2.4.0, 2.4.1, 2.4.3
>            Reporter: Julien Peloton
>            Assignee: Bryan Cutler
>            Priority: Major
>             Fix For: 2.4.5, 3.0.0
>
>
> Hi,
> I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my pyspark jobs using pandas udf are failing with java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15:
> {code:python}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> from pyspark.sql.types import BooleanType
> import pandas as pd
> @pandas_udf(BooleanType(), PandasUDFType.SCALAR)
> def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series:
>     """ Apply simple quality cuts
>     Returns
>     ----------
>     out: pandas.Series of booleans
>     Return a Pandas DataFrame with the appropriate flag: false for bad alert,
>         and true for good alert.
>     """
>     mask = nbad.values == 0
>     mask *= rb.values >= 0.55
>     mask *= abs(magdiff.values) <= 0.1
>     return pd.Series(mask)
> spark = SparkSession.builder.getOrCreate()
> # Create dummy DF
> colnames = ["nbad", "rb", "magdiff"]
> df = spark.sparkContext.parallelize(
>     zip(
>         [0, 1, 0, 0],
>         [0.01, 0.02, 0.6, 0.01],
>         [0.02, 0.05, 0.1, 0.01]
>     )
> ).toDF(colnames)
> df.show()
> # Apply cuts
> df = df\
>     .withColumn("toKeep", qualitycuts(*colnames))\
>     .filter("toKeep == true")\
>     .drop("toKeep")
> # This will fail if latest pyarrow 0.15.0 is used
> df.show()
> {code}
> and the log is:
> {code}
> Driver stacktrace:
> 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at NativeMethodAccessorImpl.java:0, took 0.660523 s
> Traceback (most recent call last):
>   File "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line 44, in <module>
>     df.show()
>   File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show
>   File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
>   File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
>   File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
> 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
> 	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
> 	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
> 	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
> 	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
> 	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
> 	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
> 	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> 	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
> 	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
> 	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
> 	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:121)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> I am not sure what is the root of this failure, but I note there is a ticket opened (https://issues.apache.org/jira/browse/ARROW-6429) suggesting some work ongoing on the Spark side.
> I guess any user upgrading pyarrow would face the same error right away, and any help or feedback would be appreciated.
> Thanks,
> Julien



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org