You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/19 15:39:10 UTC

[GitHub] [spark] WeichenXu123 opened a new pull request #24643: [SPARK-26412][PySpark][SQL][WIP] Allow Pandas UDF to take an iterator of pd.DataFrames

WeichenXu123 opened a new pull request #24643: [SPARK-26412][PySpark][SQL][WIP] Allow Pandas UDF to take an iterator of pd.DataFrames
URL: https://github.com/apache/spark/pull/24643
 
 
   ## What changes were proposed in this pull request?
   
   Allow Pandas UDF to take an iterator of pd.DataFrames
   I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review.
   We can test several typical cases:
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import pandas_udf, PandasUDFType
   from pyspark.sql.functions import udf
   from pyspark.taskcontext import TaskContext
   
   df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"])
   
   @pandas_udf("int", PandasUDFType.SCALAR_ITER)
   def fi1(it):
       pid = TaskContext.get().partitionId()
       print("DBG: fi1: do init stuff, partitionId=" + str(pid))
       for batch in it:
           yield batch + 100
       print("DBG: fi1: do close stuff, partitionId=" + str(pid))
   
   @pandas_udf("int", PandasUDFType.SCALAR_ITER)
   def fi2(it):
       pid = TaskContext.get().partitionId()
       print("DBG: fi2: do init stuff, partitionId=" + str(pid))
       for batch in it:
           yield batch + 10000
       print("DBG: fi2: do close stuff, partitionId=" + str(pid))
   
   @pandas_udf("int", PandasUDFType.SCALAR_ITER)
   def fi3(it):
       pid = TaskContext.get().partitionId()
       print("DBG: fi3: do init stuff, partitionId=" + str(pid))
       for x, y in it:
           yield x + y * 10 + 100000
       print("DBG: fi3: do close stuff, partitionId=" + str(pid))
   
   
   @pandas_udf("int", PandasUDFType.SCALAR)
   def fp1(x):
       return x + 1000
   
   @udf("int")
   def fu1(x):
       return x + 10
   
   # test running sql udf/pandas udf/pandas iter udf at the same time.
   # Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan,
   # and `fu1("a")`, `fp1("a")` will generate another two separate plans.
   df.select(fu1("a"), fp1("a"), fi1("a"), fi2("b"), fi3("a", "b")).show()
   
   # test chain two pandas iter udf together
   # Note this case `fi2(fi1("a"))` will generate only one plan
   # Also note the init stuff/close stuff calling order will be like:
   # DBG: fi2: do init stuff, partitionId=0
   # DBG: fi1: do init stuff, partitionId=0
   # DBG: fi1: do close stuff, partitionId=0
   # DBG: fi2: do close stuff, partitionId=0
   df.select(fi2(fi1("a"))).show()
   
   # test more complex chain
   # Note this case `fi1("a"), fi2("a")` will generate one plan,
   # and `fi3(fi1_output, fi2_output)` will generate another plan
   df.select(fi3(fi1("a"), fi2("a"))).show()
   ```
   
   ## How was this patch tested?
   
   To be added.
   
   Please review http://spark.apache.org/contributing.html before opening a pull request.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org