You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/03/22 09:57:12 UTC

[GitHub] [spark] viirya commented on a change in pull request #24177: [SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in Scalar Pandas UDF.

viirya commented on a change in pull request #24177: [SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in Scalar Pandas UDF.
URL: https://github.com/apache/spark/pull/24177#discussion_r268101587
 
 

 ##########
 File path: python/pyspark/serializers.py
 ##########
 @@ -378,6 +379,29 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
     Serializer used by Python worker to evaluate Pandas UDFs
     """
 
+    def __init__(self, timezone, safecheck, assign_cols_by_name, df_for_struct=False):
+        super(ArrowStreamPandasUDFSerializer, self) \
+            .__init__(timezone, safecheck, assign_cols_by_name)
+        self._df_for_struct = df_for_struct
+
+    def arrow_to_pandas(self, arrow_column, data_type):
+        from pyspark.sql.types import StructType, \
+            _arrow_column_to_pandas, _check_dataframe_localize_timestamps
+
+        if self._df_for_struct and type(data_type) == StructType:
+            import pandas as pd
+            import pyarrow as pa
+            column_arrays = zip(*[[chunk.field(i)
+                                   for i in range(chunk.type.num_children)]
+                                  for chunk in arrow_column.data.iterchunks()])
+            series = [_arrow_column_to_pandas(pa.column(field.name, pa.chunked_array(arrays)),
+                                              field.dataType)
+                      for arrays, field in zip(column_arrays, data_type)]
+            s = _check_dataframe_localize_timestamps(pd.concat(series, axis=1), self._timezone)
+        else:
+            s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column, data_type)
 
 Review comment:
   Will this create a new serializer each time calling `arrow_to_pandas`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org