You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/11 02:52:40 UTC

[GitHub] [spark] eddyxu edited a comment on pull request #31735: [SPARK-34600][PYTHON][SQL] Return User-defined types from Pandas UDF

eddyxu edited a comment on pull request #31735:
URL: https://github.com/apache/spark/pull/31735#issuecomment-795810989


   @HyukjinKwon this PR has not yet supported passing UDT into pandas UDF as input parameters yet.  Not sure what severity of `the performance was pretty bad.` is though. 
   
   Btw, I did remove the `typing dispatch` approach. In this PR, other than `applying the deconstruction of a UDT` from https://github.com/apache/spark/pull/31735/files#diff-f488fb6f95fc62225d12b7895ca4a17081a7de2f6c4dc77272c7bd6694649706R166-R170, there is no per-row based detection for UDT.  
   
   ```python
           df = self.spark.range(1, 10**6, numPartitions=32)
           df = df.cache()
           df.rdd.count()
   
           @pandas_udf(ArrayType(ExampleBoxUDT()))
           def array_of_boxes(series: pd.Series) -> pd.Series:
               boxes = []
               for _, i in series.items():
                   boxes.append([ExampleBox(*([i] * 4)), ExampleBox(*([i + 1] * 4))])
               return pd.Series(boxes)
   
           @pandas_udf(ArrayType(ArrayType(FloatType())))
           def array_of_arrays(series: pd.Series) -> pd.Series:
               boxes = []
               for _, i in series.items():
                   boxes.append([[i] * 4, [i + 1] * 4])
               return pd.Series(boxes)
   
           @udf(ArrayType(ExampleBoxUDT()))
           def plain_udf(i):
               return [ExampleBox(*([i] * 4)), ExampleBox(*([i + 1] * 4))]
   
           @pandas_udf(IntegerType())
           def vectorized(series: pd.Series) -> pd.Series:
               return series + 1
   
           import time
   
           start = time.time()
           df.withColumn("vectorized", vectorized(df.id)).rdd.count()
           print(f"Vectorized Pandas UDF: {time.time() - start}")
   
           start = time.time()
           df.withColumn("boxes", array_of_boxes(df.id)).rdd.count()
           print(f"Pandas UDF + UDT: {time.time() - start}")
   
           start = time.time()
           df.withColumn("b", array_of_arrays(df.id)).rdd.count()
           print(f"Pandas UDF + No UDT: {time.time() - start}")
   
           start = time.time()
           df.withColumn("plain_udf", plain_udf(df.id)).rdd.count()
           print(f"Normal UDF: {time.time() - start}")
   
   ```
   
   It seems no matter what UDF / Pandas UDF to run, the first one after `spark.range()` is the slowest, so I measure all of the benchmarks directly after `spark.range` for fairness. Also the previous benchmark seemed messed up with `.cache()` and input size. Here is the updated results from the above code.
   
   
   UDF          |  Time
   ------------ | -------------
   Pandas UDF + UDT     |  20.1209s
   Pandas UDF + ArrayType  |    4.1171s
   Normal UDF + UDT | 20.790s
   Pandas UDF (vectorized) |  2.2172s
   
   It should be understandable that using UDT with `@pandas_udf` means that pyspark can not use vectorized computation from pandas. And the generated `pd.Series`, which is the columnar format in memory already, can not directly to construct a pyarrow array, because we need to deconstruct UDT to its `sqlType` form. 
   
   However, as demonstrated above, generating 2M UDTs takes only 20s on single machine. This is the worst case scenario for the performance comparison because there is almost no other overhead than UDT deconstruction.  
   
   But in our real-world use cases that:
   
   1. We would like to use Pandas UDF to run heavy deep learning model inference (i.e., tensorflow or pytorch) so that we can amortize model initialization overhead, as well as being able to feed model in batch.  Also, 2M boxes could takes hours to generate running on on tens of thousands of rows. 
   2. UDT implementation has domain-specific code that can not be baked in a `StructType`. Which means today, when we need to use such code, we need to glue a "StructType -> UDT" udf after a `pandas UDF`, which complicates the codebase and not necessarily  faster than this PR (basically "row 2 + 3 vs row 1" in the above table).
   
   If we use UDT in pandas UDF with cautions, it does deliver many benefits for our use cases.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org