You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tim Kellogg (Jira)" <ji...@apache.org> on 2019/12/02 18:37:00 UTC

[jira] [Updated] (SPARK-30063) Failure when returning a value from multiple Pandas UDFs

     [ https://issues.apache.org/jira/browse/SPARK-30063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tim Kellogg updated SPARK-30063:
--------------------------------
    Description: 
I have 20 Pandas UDFs that I'm trying to evaluate all at the same time.
 * PandasUDFType.GROUPED_AGG
 * 3 columns in the input data frame being serialized over Arrow to Python worker. See below for clarification.
 * All functions take 2 parameters, some combination of the 3 received as Arrow input.
 * Varying return types, see details below.

_*I get an IllegalArgumentException on the Scala side of the worker when deserializing from Python.*_
h2. Exception & Stack Trace
{code:java}
19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.IllegalArgumentException
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
{code}
h2. Input Arrow Schema

I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the input, in load_stream, the code is print(batch, batch.schema, file=log_file)
{code:java}
<pyarrow.lib.RecordBatch object at 0x10640ecc8> 
_0: double
_1: double
_2: double
metadata
--------
OrderedDict()
{code}
h2. Output Arrow Schema

I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the output, in dump_stream, the code is print(batch, batch.schema, file=log_file)
{code:java}
<pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float
_1: float
_2: float
_3: int32
_4: int32
_5: int32
_6: int32
_7: int32
_8: float
_9: float
_10: int32
_11: list<item: float>
  child 0, item: float
_12: list<item: float>
  child 0, item: float
_13: float
_14: float
_15: int32
_16: float
_17: list<item: float>
  child 0, item: float
_18: list<item: float>
  child 0, item: float
_19: float
{code}
h2. Arrow Message

I edited ArrowPythonReader.scala at line 163 to print out the Arrow message.

Debug code:
{code:java}
val fw = new java.io.FileWriter("spark-debug.txt", true)
try {
  val buf = new Array[Byte](40000)
  stream.read(buf)

  fw.write(s"Spark reader\n")
  for (b <- buf) {
    fw.write(String.format("%02x", Byte.box(b)))
  }
  fw.write(s"\n")
} finally fw.close()
{code}
Debug output (some trailing 0's included for completeness).
{code:java}
ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
{code}
 
h2. Query Plan at Point of Failure

Right before the failure, I printed out the explain(True) output.

 
{code:java}
== Parsed Logical Plan ==
'Project [structstojson(named_struct(), None) AS key#269, unresolvedalias('accuracy, None), unresolvedalias('areaUnderPR, None), unresolvedalias('areaUnderROC, None), unresolvedalias('confusionMatrix, None), unresolvedalias('count, None), unresolvedalias('f1Score, None), unresolvedalias('f1Score_0, None), unresolvedalias('positiveClassRate, None), unresolvedalias('prCurve, None), unresolvedalias('precision, None), unresolvedalias('precision_0, None), unresolvedalias('predictionRate, None), unresolvedalias('recall, None), unresolvedalias('rocCurve, None), unresolvedalias('specificity, None)]
+- Aggregate [udf(cast(label#146 as double), cast(prediction#15 as double)) AS accuracy#232, _auc_pr(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderPR#233, udf(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderROC#225, array(array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double))), array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double)))) AS confusionMatrix#238, _count(cast(label#146 as double)) AS count#234, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score#231, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score_0#228, _rate(cast(label#146 as double)) AS positiveClassRate#227, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS prCurve#230, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision#236, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision_0#235, _rate(cast(prediction#15 as double)) AS predictionRate#237, udf(cast(label#146 as double), cast(prediction#15 as double)) AS recall#229, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS rocCurve#239, udf(cast(label#146 as double), cast(prediction#15 as double)) AS specificity#226]
   +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146, test AS customer#156, foo AS solution#157, bar AS insight#158, model AS model_name#159, 1.0 AS version#160, model1 AS model_id#161, current_timestamp() AS timestamp#162]
      +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146]
         +- Join Inner, (encounterID#13 = encounterID#145)
            :- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
            :  +- Filter ((false || NOT test#40) = false)
            :     +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (true && (dim1#11 = foo)) AS test#40]
            :        +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
            :           +- Join Cross
            :              :- LogicalRDD [dim1#11, dim2#12, encounterID#13, label#14, prediction#15, probability#16], false
            :              +- LogicalRDD [model_id#23], false
            +- Project [encounterID#145, label#146]
               +- Join Cross
                  :- LogicalRDD [dim1#143, dim2#144, encounterID#145, label#146, prediction#147, probability#148], false
                  +- LogicalRDD [model_id#23], false== Analyzed Logical Plan ==
key: string, accuracy: float, areaUnderPR: float, areaUnderROC: float, confusionMatrix: array<array<int>>, count: int, f1Score: float, f1Score_0: float, positiveClassRate: int, prCurve: struct<x:array<float>,y:array<float>>, precision: float, precision_0: float, predictionRate: int, recall: float, rocCurve: struct<x:array<float>,y:array<float>>, specificity: float
Project [structstojson(named_struct(), Some(America/Los_Angeles)) AS key#269, accuracy#232, areaUnderPR#233, areaUnderROC#225, confusionMatrix#238, count#234, f1Score#231, f1Score_0#228, positiveClassRate#227, prCurve#230, precision#236, precision_0#235, predictionRate#237, recall#229, rocCurve#239, specificity#226]
+- Aggregate [udf(cast(label#146 as double), cast(prediction#15 as double)) AS accuracy#232, _auc_pr(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderPR#233, udf(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderROC#225, array(array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double))), array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double)))) AS confusionMatrix#238, _count(cast(label#146 as double)) AS count#234, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score#231, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score_0#228, _rate(cast(label#146 as double)) AS positiveClassRate#227, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS prCurve#230, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision#236, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision_0#235, _rate(cast(prediction#15 as double)) AS predictionRate#237, udf(cast(label#146 as double), cast(prediction#15 as double)) AS recall#229, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS rocCurve#239, udf(cast(label#146 as double), cast(prediction#15 as double)) AS specificity#226]
   +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146, test AS customer#156, foo AS solution#157, bar AS insight#158, model AS model_name#159, 1.0 AS version#160, model1 AS model_id#161, current_timestamp() AS timestamp#162]
      +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146]
         +- Join Inner, (encounterID#13 = encounterID#145)
            :- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
            :  +- Filter ((false || NOT test#40) = false)
            :     +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (true && (dim1#11 = foo)) AS test#40]
            :        +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
            :           +- Join Cross
            :              :- LogicalRDD [dim1#11, dim2#12, encounterID#13, label#14, prediction#15, probability#16], false
            :              +- LogicalRDD [model_id#23], false
            +- Project [encounterID#145, label#146]
               +- Join Cross
                  :- LogicalRDD [dim1#143, dim2#144, encounterID#145, label#146, prediction#147, probability#148], false
                  +- LogicalRDD [model_id#23], false== Optimized Logical Plan ==
Aggregate [{} AS key#269, udf(label#146, prediction#15) AS accuracy#232, _auc_pr(label#146, probability#16) AS areaUnderPR#233, udf(label#146, probability#16) AS areaUnderROC#225, array(array(udf(label#146, prediction#15), udf(label#146, prediction#15)), array(udf(label#146, prediction#15), udf(label#146, prediction#15))) AS confusionMatrix#238, _count(label#146) AS count#234, udf(label#146, prediction#15) AS f1Score#231, udf(label#146, prediction#15) AS f1Score_0#228, _rate(label#146) AS positiveClassRate#227, named_struct(x, udf(label#146, probability#16), y, udf(label#146, probability#16)) AS prCurve#230, udf(label#146, prediction#15) AS precision#236, udf(label#146, prediction#15) AS precision_0#235, _rate(prediction#15) AS predictionRate#237, udf(label#146, prediction#15) AS recall#229, named_struct(x, udf(label#146, probability#16), y, udf(label#146, probability#16)) AS rocCurve#239, udf(label#146, prediction#15) AS specificity#226]
+- Project [prediction#15, probability#16, label#146]
   +- Join Inner, (encounterID#13 = encounterID#145)
      :- Project [encounterID#13, prediction#15, probability#16]
      :  +- Filter ((isnotnull(test#40) && (NOT test#40 = false)) && isnotnull(encounterID#13))
      :     +- InMemoryRelation [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, test#40], StorageLevel(disk, memory, deserialized, 1 replicas)
      :           +- *(2) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (dim1#11 = foo) AS test#40]
      :              +- CartesianProduct
      :                 :- *(1) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16]
      :                 :  +- Scan ExistingRDD[dim1#11,dim2#12,encounterID#13,label#14,prediction#15,probability#16]
      :                 +- Scan ExistingRDD[model_id#23]
      +- Join Cross
         :- Project [encounterID#145, label#146]
         :  +- Filter isnotnull(encounterID#145)
         :     +- LogicalRDD [dim1#143, dim2#144, encounterID#145, label#146, prediction#147, probability#148], false
         +- Project
            +- LogicalRDD [model_id#23], false== Physical Plan ==
!AggregateInPandas [udf(label#146, prediction#15), _auc_pr(label#146, probability#16), udf(label#146, probability#16), udf(label#146, prediction#15), udf(label#146, prediction#15), udf(label#146, prediction#15), udf(label#146, prediction#15), _count(label#146), udf(label#146, prediction#15), udf(label#146, prediction#15), _rate(label#146), udf(label#146, probability#16), udf(label#146, probability#16), udf(label#146, prediction#15), udf(label#146, prediction#15), _rate(prediction#15), udf(label#146, prediction#15), udf(label#146, probability#16), udf(label#146, probability#16), udf(label#146, prediction#15)], [{} AS key#269, udf(label, prediction)#201 AS accuracy#232, _auc_pr(label, probability)#209 AS areaUnderPR#233, udf(label, probability)#208 AS areaUnderROC#225, array(array(udf(label, prediction)#213, udf(label, prediction)#214), array(udf(label, prediction)#215, udf(label, prediction)#216)) AS confusionMatrix#238, _count(label)#210 AS count#234, udf(label, prediction)#206 AS f1Score#231, udf(label, prediction)#207 AS f1Score_0#228, _rate(label)#212 AS positiveClassRate#227, named_struct(x, udf(label, probability)#219, y, udf(label, probability)#220) AS prCurve#230, udf(label, prediction)#202 AS precision#236, udf(label, prediction)#203 AS precision_0#235, _rate(prediction)#211 AS predictionRate#237, udf(label, prediction)#204 AS recall#229, named_struct(x, udf(label, probability)#217, y, udf(label, probability)#218) AS rocCurve#239, udf(label, prediction)#205 AS specificity#226]
+- Exchange SinglePartition
   +- *(4) Project [prediction#15, probability#16, label#146]
      +- *(4) BroadcastHashJoin [encounterID#13], [encounterID#145], Inner, BuildLeft
         :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
         :  +- *(1) Project [encounterID#13, prediction#15, probability#16]
         :     +- *(1) Filter ((isnotnull(test#40) && (NOT test#40 = false)) && isnotnull(encounterID#13))
         :        +- InMemoryTableScan [encounterID#13, prediction#15, probability#16, test#40], [isnotnull(test#40), (NOT test#40 = false), isnotnull(encounterID#13)]
         :              +- InMemoryRelation [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, test#40], StorageLevel(disk, memory, deserialized, 1 replicas)
         :                    +- *(2) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (dim1#11 = foo) AS test#40]
         :                       +- CartesianProduct
         :                          :- *(1) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16]
         :                          :  +- Scan ExistingRDD[dim1#11,dim2#12,encounterID#13,label#14,prediction#15,probability#16]
         :                          +- Scan ExistingRDD[model_id#23]
         +- CartesianProduct
            :- *(2) Project [encounterID#145, label#146]
            :  +- *(2) Filter isnotnull(encounterID#145)
            :     +- Scan ExistingRDD[dim1#143,dim2#144,encounterID#145,label#146,prediction#147,probability#148]
            +- *(3) Project
               +- Scan ExistingRDD[model_id#23]
{code}
h2. Related Bugs

I have a related bug that I've gotten where the schema in the input Arrow message was transmiitted incorrectly. In that case, the input schema should have been <long, float, long> but was transmitted as <long, long, float>. As a result, the float column was interpreted as a long (equivalent C code to illustrate behavior: )
{code:java}
long reinterpret(double floating_point_number) {
  return *(long*)(&floating_point_number)
}
{code}
I got around this bug by making all 3 columns float and converting them to long within the UDF via Pandas Series.apply(np.int). Strangely, a Column.astype('float') didn't seem to have an effect, I had to make them float at the source.

Along the way, I had trouble with [Python's dict keys being non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].] This led columns being passed to GroupedData.agg() in different orders for each worker and driver process. I've mitigated this by explicitly ordering the columns before sending them to agg. I don't think this is an issue anymore, but I'm calling it out just in case.

 

 

  was:
I have 20 Pandas UDFs that I'm trying to evaluate all at the same time.
 * PandasUDFType.GROUPED_AGG
 * 3 columns in the input data frame being serialized over Arrow to Python worker. See below for clarification.
 * All functions take 2 parameters, some combination of the 3 received as Arrow input.
 * Varying return types, see details below.

_*I get an IllegalArgumentException on the Scala side of the worker when deserializing from Python.*_
h2. Exception & Stack Trace
{code:java}
19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.IllegalArgumentException
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
{code}
h2. Input Arrow Schema

I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the input, in load_stream, the code is print(batch, batch.schema, file=log_file)
{code:java}
<pyarrow.lib.RecordBatch object at 0x10640ecc8> 
_0: double
_1: double
_2: double
metadata
--------
OrderedDict()
{code}
h2. Output Arrow Schema

I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the output, in dump_stream, the code is print(batch, batch.schema, file=log_file)
{code:java}
<pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float
_1: float
_2: float
_3: int32
_4: int32
_5: int32
_6: int32
_7: int32
_8: float
_9: float
_10: int32
_11: list<item: float>
  child 0, item: float
_12: list<item: float>
  child 0, item: float
_13: float
_14: float
_15: int32
_16: float
_17: list<item: float>
  child 0, item: float
_18: list<item: float>
  child 0, item: float
_19: float
{code}
h2. Arrow Message

I edited ArrowPythonReader.scala at line 163 to print out the Arrow message.

Debug code:
{code:java}
val fw = new java.io.FileWriter("spark-debug.txt", true)
try {
  val buf = new Array[Byte](40000)
  stream.read(buf)

  fw.write(s"Spark reader\n")
  for (b <- buf) {
    fw.write(String.format("%02x", Byte.box(b)))
  }
  fw.write(s"\n")
} finally fw.close()
{code}
Debug output (some trailing 0's included for completeness).
{code:java}
ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
{code}
 
h2. Related Bugs

I have a related bug that I've gotten where the schema in the input Arrow message was transmiitted incorrectly. In that case, the input schema should have been <long, float, long> but was transmitted as <long, long, float>. As a result, the float column was interpreted as a long (equivalent C code to illustrate behavior: )
{code:java}
long reinterpret(double floating_point_number) {
  return *(long*)(&floating_point_number)
}
{code}
I got around this bug by making all 3 columns float and converting them to long within the UDF via Pandas Series.apply(np.int). Strangely, a Column.astype('float') didn't seem to have an effect, I had to make them float at the source.

Along the way, I had trouble with [Python's dict keys being non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].] This led columns being passed to GroupedData.agg() in different orders for each worker and driver process. I've mitigated this by explicitly ordering the columns before sending them to agg. I don't think this is an issue anymore, but I'm calling it out just in case.

 

 


> Failure when returning a value from multiple Pandas UDFs
> --------------------------------------------------------
>
>                 Key: SPARK-30063
>                 URL: https://issues.apache.org/jira/browse/SPARK-30063
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.3, 2.4.4
>         Environment: Happens on Mac & Ubuntu (Docker). Seems to happen on both 2.4.3 and 2.4.4
>            Reporter: Tim Kellogg
>            Priority: Major
>         Attachments: spark-debug.txt
>
>
> I have 20 Pandas UDFs that I'm trying to evaluate all at the same time.
>  * PandasUDFType.GROUPED_AGG
>  * 3 columns in the input data frame being serialized over Arrow to Python worker. See below for clarification.
>  * All functions take 2 parameters, some combination of the 3 received as Arrow input.
>  * Varying return types, see details below.
> _*I get an IllegalArgumentException on the Scala side of the worker when deserializing from Python.*_
> h2. Exception & Stack Trace
> {code:java}
> 19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
> java.lang.IllegalArgumentException
> 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
> 	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
> 	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
> 	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
> 	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
> 	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
> 	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
> 	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
> 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
> 	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
> 	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
> 	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
> 	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
> 	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
> 	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
> 	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
> 	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:123)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}
> h2. Input Arrow Schema
> I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the input, in load_stream, the code is print(batch, batch.schema, file=log_file)
> {code:java}
> <pyarrow.lib.RecordBatch object at 0x10640ecc8> 
> _0: double
> _1: double
> _2: double
> metadata
> --------
> OrderedDict()
> {code}
> h2. Output Arrow Schema
> I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the output, in dump_stream, the code is print(batch, batch.schema, file=log_file)
> {code:java}
> <pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float
> _1: float
> _2: float
> _3: int32
> _4: int32
> _5: int32
> _6: int32
> _7: int32
> _8: float
> _9: float
> _10: int32
> _11: list<item: float>
>   child 0, item: float
> _12: list<item: float>
>   child 0, item: float
> _13: float
> _14: float
> _15: int32
> _16: float
> _17: list<item: float>
>   child 0, item: float
> _18: list<item: float>
>   child 0, item: float
> _19: float
> {code}
> h2. Arrow Message
> I edited ArrowPythonReader.scala at line 163 to print out the Arrow message.
> Debug code:
> {code:java}
> val fw = new java.io.FileWriter("spark-debug.txt", true)
> try {
>   val buf = new Array[Byte](40000)
>   stream.read(buf)
>   fw.write(s"Spark reader\n")
>   for (b <- buf) {
>     fw.write(String.format("%02x", Byte.box(b)))
>   }
>   fw.write(s"\n")
> } finally fw.close()
> {code}
> Debug output (some trailing 0's included for completeness).
> {code:java}
> ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
> {code}
>  
> h2. Query Plan at Point of Failure
> Right before the failure, I printed out the explain(True) output.
>  
> {code:java}
> == Parsed Logical Plan ==
> 'Project [structstojson(named_struct(), None) AS key#269, unresolvedalias('accuracy, None), unresolvedalias('areaUnderPR, None), unresolvedalias('areaUnderROC, None), unresolvedalias('confusionMatrix, None), unresolvedalias('count, None), unresolvedalias('f1Score, None), unresolvedalias('f1Score_0, None), unresolvedalias('positiveClassRate, None), unresolvedalias('prCurve, None), unresolvedalias('precision, None), unresolvedalias('precision_0, None), unresolvedalias('predictionRate, None), unresolvedalias('recall, None), unresolvedalias('rocCurve, None), unresolvedalias('specificity, None)]
> +- Aggregate [udf(cast(label#146 as double), cast(prediction#15 as double)) AS accuracy#232, _auc_pr(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderPR#233, udf(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderROC#225, array(array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double))), array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double)))) AS confusionMatrix#238, _count(cast(label#146 as double)) AS count#234, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score#231, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score_0#228, _rate(cast(label#146 as double)) AS positiveClassRate#227, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS prCurve#230, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision#236, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision_0#235, _rate(cast(prediction#15 as double)) AS predictionRate#237, udf(cast(label#146 as double), cast(prediction#15 as double)) AS recall#229, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS rocCurve#239, udf(cast(label#146 as double), cast(prediction#15 as double)) AS specificity#226]
>    +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146, test AS customer#156, foo AS solution#157, bar AS insight#158, model AS model_name#159, 1.0 AS version#160, model1 AS model_id#161, current_timestamp() AS timestamp#162]
>       +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146]
>          +- Join Inner, (encounterID#13 = encounterID#145)
>             :- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
>             :  +- Filter ((false || NOT test#40) = false)
>             :     +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (true && (dim1#11 = foo)) AS test#40]
>             :        +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
>             :           +- Join Cross
>             :              :- LogicalRDD [dim1#11, dim2#12, encounterID#13, label#14, prediction#15, probability#16], false
>             :              +- LogicalRDD [model_id#23], false
>             +- Project [encounterID#145, label#146]
>                +- Join Cross
>                   :- LogicalRDD [dim1#143, dim2#144, encounterID#145, label#146, prediction#147, probability#148], false
>                   +- LogicalRDD [model_id#23], false== Analyzed Logical Plan ==
> key: string, accuracy: float, areaUnderPR: float, areaUnderROC: float, confusionMatrix: array<array<int>>, count: int, f1Score: float, f1Score_0: float, positiveClassRate: int, prCurve: struct<x:array<float>,y:array<float>>, precision: float, precision_0: float, predictionRate: int, recall: float, rocCurve: struct<x:array<float>,y:array<float>>, specificity: float
> Project [structstojson(named_struct(), Some(America/Los_Angeles)) AS key#269, accuracy#232, areaUnderPR#233, areaUnderROC#225, confusionMatrix#238, count#234, f1Score#231, f1Score_0#228, positiveClassRate#227, prCurve#230, precision#236, precision_0#235, predictionRate#237, recall#229, rocCurve#239, specificity#226]
> +- Aggregate [udf(cast(label#146 as double), cast(prediction#15 as double)) AS accuracy#232, _auc_pr(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderPR#233, udf(cast(label#146 as double), cast(probability#16 as double)) AS areaUnderROC#225, array(array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double))), array(udf(cast(label#146 as double), cast(prediction#15 as double)), udf(cast(label#146 as double), cast(prediction#15 as double)))) AS confusionMatrix#238, _count(cast(label#146 as double)) AS count#234, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score#231, udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score_0#228, _rate(cast(label#146 as double)) AS positiveClassRate#227, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS prCurve#230, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision#236, udf(cast(label#146 as double), cast(prediction#15 as double)) AS precision_0#235, _rate(cast(prediction#15 as double)) AS predictionRate#237, udf(cast(label#146 as double), cast(prediction#15 as double)) AS recall#229, named_struct(x, udf(cast(label#146 as double), cast(probability#16 as double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) AS rocCurve#239, udf(cast(label#146 as double), cast(prediction#15 as double)) AS specificity#226]
>    +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146, test AS customer#156, foo AS solution#157, bar AS insight#158, model AS model_name#159, 1.0 AS version#160, model1 AS model_id#161, current_timestamp() AS timestamp#162]
>       +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, probability#16, model_id#23, label#146]
>          +- Join Inner, (encounterID#13 = encounterID#145)
>             :- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
>             :  +- Filter ((false || NOT test#40) = false)
>             :     +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (true && (dim1#11 = foo)) AS test#40]
>             :        +- Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23]
>             :           +- Join Cross
>             :              :- LogicalRDD [dim1#11, dim2#12, encounterID#13, label#14, prediction#15, probability#16], false
>             :              +- LogicalRDD [model_id#23], false
>             +- Project [encounterID#145, label#146]
>                +- Join Cross
>                   :- LogicalRDD [dim1#143, dim2#144, encounterID#145, label#146, prediction#147, probability#148], false
>                   +- LogicalRDD [model_id#23], false== Optimized Logical Plan ==
> Aggregate [{} AS key#269, udf(label#146, prediction#15) AS accuracy#232, _auc_pr(label#146, probability#16) AS areaUnderPR#233, udf(label#146, probability#16) AS areaUnderROC#225, array(array(udf(label#146, prediction#15), udf(label#146, prediction#15)), array(udf(label#146, prediction#15), udf(label#146, prediction#15))) AS confusionMatrix#238, _count(label#146) AS count#234, udf(label#146, prediction#15) AS f1Score#231, udf(label#146, prediction#15) AS f1Score_0#228, _rate(label#146) AS positiveClassRate#227, named_struct(x, udf(label#146, probability#16), y, udf(label#146, probability#16)) AS prCurve#230, udf(label#146, prediction#15) AS precision#236, udf(label#146, prediction#15) AS precision_0#235, _rate(prediction#15) AS predictionRate#237, udf(label#146, prediction#15) AS recall#229, named_struct(x, udf(label#146, probability#16), y, udf(label#146, probability#16)) AS rocCurve#239, udf(label#146, prediction#15) AS specificity#226]
> +- Project [prediction#15, probability#16, label#146]
>    +- Join Inner, (encounterID#13 = encounterID#145)
>       :- Project [encounterID#13, prediction#15, probability#16]
>       :  +- Filter ((isnotnull(test#40) && (NOT test#40 = false)) && isnotnull(encounterID#13))
>       :     +- InMemoryRelation [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, test#40], StorageLevel(disk, memory, deserialized, 1 replicas)
>       :           +- *(2) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (dim1#11 = foo) AS test#40]
>       :              +- CartesianProduct
>       :                 :- *(1) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16]
>       :                 :  +- Scan ExistingRDD[dim1#11,dim2#12,encounterID#13,label#14,prediction#15,probability#16]
>       :                 +- Scan ExistingRDD[model_id#23]
>       +- Join Cross
>          :- Project [encounterID#145, label#146]
>          :  +- Filter isnotnull(encounterID#145)
>          :     +- LogicalRDD [dim1#143, dim2#144, encounterID#145, label#146, prediction#147, probability#148], false
>          +- Project
>             +- LogicalRDD [model_id#23], false== Physical Plan ==
> !AggregateInPandas [udf(label#146, prediction#15), _auc_pr(label#146, probability#16), udf(label#146, probability#16), udf(label#146, prediction#15), udf(label#146, prediction#15), udf(label#146, prediction#15), udf(label#146, prediction#15), _count(label#146), udf(label#146, prediction#15), udf(label#146, prediction#15), _rate(label#146), udf(label#146, probability#16), udf(label#146, probability#16), udf(label#146, prediction#15), udf(label#146, prediction#15), _rate(prediction#15), udf(label#146, prediction#15), udf(label#146, probability#16), udf(label#146, probability#16), udf(label#146, prediction#15)], [{} AS key#269, udf(label, prediction)#201 AS accuracy#232, _auc_pr(label, probability)#209 AS areaUnderPR#233, udf(label, probability)#208 AS areaUnderROC#225, array(array(udf(label, prediction)#213, udf(label, prediction)#214), array(udf(label, prediction)#215, udf(label, prediction)#216)) AS confusionMatrix#238, _count(label)#210 AS count#234, udf(label, prediction)#206 AS f1Score#231, udf(label, prediction)#207 AS f1Score_0#228, _rate(label)#212 AS positiveClassRate#227, named_struct(x, udf(label, probability)#219, y, udf(label, probability)#220) AS prCurve#230, udf(label, prediction)#202 AS precision#236, udf(label, prediction)#203 AS precision_0#235, _rate(prediction)#211 AS predictionRate#237, udf(label, prediction)#204 AS recall#229, named_struct(x, udf(label, probability)#217, y, udf(label, probability)#218) AS rocCurve#239, udf(label, prediction)#205 AS specificity#226]
> +- Exchange SinglePartition
>    +- *(4) Project [prediction#15, probability#16, label#146]
>       +- *(4) BroadcastHashJoin [encounterID#13], [encounterID#145], Inner, BuildLeft
>          :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
>          :  +- *(1) Project [encounterID#13, prediction#15, probability#16]
>          :     +- *(1) Filter ((isnotnull(test#40) && (NOT test#40 = false)) && isnotnull(encounterID#13))
>          :        +- InMemoryTableScan [encounterID#13, prediction#15, probability#16, test#40], [isnotnull(test#40), (NOT test#40 = false), isnotnull(encounterID#13)]
>          :              +- InMemoryRelation [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, test#40], StorageLevel(disk, memory, deserialized, 1 replicas)
>          :                    +- *(2) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16, model_id#23, (dim1#11 = foo) AS test#40]
>          :                       +- CartesianProduct
>          :                          :- *(1) Project [dim1#11, dim2#12, encounterID#13, prediction#15, probability#16]
>          :                          :  +- Scan ExistingRDD[dim1#11,dim2#12,encounterID#13,label#14,prediction#15,probability#16]
>          :                          +- Scan ExistingRDD[model_id#23]
>          +- CartesianProduct
>             :- *(2) Project [encounterID#145, label#146]
>             :  +- *(2) Filter isnotnull(encounterID#145)
>             :     +- Scan ExistingRDD[dim1#143,dim2#144,encounterID#145,label#146,prediction#147,probability#148]
>             +- *(3) Project
>                +- Scan ExistingRDD[model_id#23]
> {code}
> h2. Related Bugs
> I have a related bug that I've gotten where the schema in the input Arrow message was transmiitted incorrectly. In that case, the input schema should have been <long, float, long> but was transmitted as <long, long, float>. As a result, the float column was interpreted as a long (equivalent C code to illustrate behavior: )
> {code:java}
> long reinterpret(double floating_point_number) {
>   return *(long*)(&floating_point_number)
> }
> {code}
> I got around this bug by making all 3 columns float and converting them to long within the UDF via Pandas Series.apply(np.int). Strangely, a Column.astype('float') didn't seem to have an effect, I had to make them float at the source.
> Along the way, I had trouble with [Python's dict keys being non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].] This led columns being passed to GroupedData.agg() in different orders for each worker and driver process. I've mitigated this by explicitly ordering the columns before sending them to agg. I don't think this is an issue anymore, but I'm calling it out just in case.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org