You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruben Berenguel (Jira)" <ji...@apache.org> on 2019/11/29 20:59:00 UTC

[jira] [Commented] (SPARK-30063) Failure when returning a value from multiple Pandas UDFs

    [ https://issues.apache.org/jira/browse/SPARK-30063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985192#comment-16985192 ] 

Ruben Berenguel commented on SPARK-30063:
-----------------------------------------

Hi [~tkellogg] I’d like to have a look, do you have some small or shareable reproducible data/code? Otherwise it’s a bit hard to pinpoint in which side (Spark, Arrow-Spark, Python) the problem may be (since it may as well be a combination of the 3). My hunch is that the schema may be passed incorrectly (as in your related bug) or the converse, the schema is being passed correctly and the data incorrectly (different order). When that happens the Arrow reader at the JVM won’t make sense of the message received, and the error would look like that one.

> Failure when returning a value from multiple Pandas UDFs
> --------------------------------------------------------
>
>                 Key: SPARK-30063
>                 URL: https://issues.apache.org/jira/browse/SPARK-30063
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.3, 2.4.4
>         Environment: Happens on Mac & Ubuntu (Docker). Seems to happen on both 2.4.3 and 2.4.4
>            Reporter: Tim Kellogg
>            Priority: Major
>         Attachments: spark-debug.txt
>
>
> I have 20 Pandas UDFs that I'm trying to evaluate all at the same time.
>  * PandasUDFType.GROUPED_AGG
>  * 3 columns in the input data frame being serialized over Arrow to Python worker. See below for clarification.
>  * All functions take 2 parameters, some combination of the 3 received as Arrow input.
>  * Varying return types, see details below.
> _*I get an IllegalArgumentException on the Scala side of the worker when deserializing from Python.*_
> h2. Exception & Stack Trace
> {code:java}
> 19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
> java.lang.IllegalArgumentException
> 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
> 	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
> 	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
> 	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
> 	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
> 	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
> 	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
> 	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
> 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
> 	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
> 	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
> 	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
> 	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
> 	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
> 	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
> 	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> h2. Input Arrow Schema
> I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the input, in load_stream, the code is print(batch, batch.schema, file=log_file)
> {code:java}
> <pyarrow.lib.RecordBatch object at 0x10640ecc8> 
> _0: double
> _1: double
> _2: double
> metadata
> --------
> OrderedDict()
> {code}
> h2. Output Arrow Schema
> I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the output, in dump_stream, the code is print(batch, batch.schema, file=log_file)
> {code:java}
> <pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float
> _1: float
> _2: float
> _3: int32
> _4: int32
> _5: int32
> _6: int32
> _7: int32
> _8: float
> _9: float
> _10: int32
> _11: list<item: float>
>   child 0, item: float
> _12: list<item: float>
>   child 0, item: float
> _13: float
> _14: float
> _15: int32
> _16: float
> _17: list<item: float>
>   child 0, item: float
> _18: list<item: float>
>   child 0, item: float
> _19: float
> {code}
> h2. Arrow Message
> I edited ArrowPythonReader.scala at line 163 to print out the Arrow message.
> Debug code:
> {code:java}
> val fw = new java.io.FileWriter("spark-debug.txt", true)
> try {
>   val buf = new Array[Byte](40000)
>   stream.read(buf)
>   fw.write(s"Spark reader\n")
>   for (b <- buf) {
>     fw.write(String.format("%02x", Byte.box(b)))
>   }
>   fw.write(s"\n")
> } finally fw.close()
> {code}
> Debug output (some trailing 0's included for completeness).
> {code:java}
> ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
> {code}
>  
> h2. Related Bugs
> I have a related bug that I've gotten where the schema in the input Arrow message was transmiitted incorrectly. In that case, the input schema should have been <long, float, long> but was transmitted as <long, long, float>. As a result, the float column was interpreted as a long (equivalent C code to illustrate behavior: )
> {code:java}
> long reinterpret(double floating_point_number) {
>   return *(long*)(&floating_point_number)
> }
> {code}
> I got around this bug by making all 3 columns float and converting them to long within the UDF via Pandas Series.apply(np.int). Strangely, a Column.astype('float') didn't seem to have an effect, I had to make them float at the source.
> Along the way, I had trouble with [Python's dict keys being non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].] This led columns being passed to GroupedData.agg() in different orders for each worker and driver process. I've mitigated this by explicitly ordering the columns before sending them to agg. I don't think this is an issue anymore, but I'm calling it out just in case.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org