You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruifeng Zheng (Jira)" <ji...@apache.org> on 2023/01/11 03:18:00 UTC

[jira] [Comment Edited] (SPARK-41971) `toPandas` should support duplicate filed names when arrow-optimization is on

    [ https://issues.apache.org/jira/browse/SPARK-41971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17657059#comment-17657059 ] 

Ruifeng Zheng edited comment on SPARK-41971 at 1/11/23 3:17 AM:
----------------------------------------------------------------

I think that is due to something is wrong in `ArrowConverter`.

In Spark, a schema is just a StructType, but in arrow that is not the case, a schema is a class other than datatype. This difference maybe the cause.


was (Author: podongfeng):
I think that is due to something is wrong in `ArrowConverter`.

In Spark, a schema is just a StructType, but in arrow this is not the case, a schema is a class other than datatype. This difference maybe the cause.

> `toPandas` should support duplicate filed names when arrow-optimization is on
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-41971
>                 URL: https://issues.apache.org/jira/browse/SPARK-41971
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.4.0
>            Reporter: Ruifeng Zheng
>            Priority: Minor
>
> toPandas support duplicate columns name, but for a struct column, it doesnot support duplicate field names.
> {code:java}
> In [27]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
> In [28]: spark.sql("select 1 v, 1 v").toPandas()
> Out[28]: 
>    v  v
> 0  1  1
> In [29]: spark.sql("select struct(1 v, 1 v)").toPandas()
> Out[29]: 
>   struct(1 AS v, 1 AS v)
> 0                 (1, 1)
> In [30]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
> In [31]: spark.sql("select 1 v, 1 v").toPandas()
> Out[31]: 
>    v  v
> 0  1  1
> In [32]: spark.sql("select struct(1 v, 1 v)").toPandas()
> /Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/pandas/conversion.py:204: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
>   Ran out of field metadata, likely malformed
>   warn(msg)
> ---------------------------------------------------------------------------
> ArrowInvalid                              Traceback (most recent call last)
> Cell In[32], line 1
> ----> 1 spark.sql("select struct(1 v, 1 v)").toPandas()
> File ~/Dev/spark/python/pyspark/sql/pandas/conversion.py:143, in PandasConversionMixin.toPandas(self)
>     141 tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))]
>     142 self_destruct = jconf.arrowPySparkSelfDestructEnabled()
> --> 143 batches = self.toDF(*tmp_column_names)._collect_as_arrow(
>     144     split_batches=self_destruct
>     145 )
>     146 if len(batches) > 0:
>     147     table = pyarrow.Table.from_batches(batches)
> File ~/Dev/spark/python/pyspark/sql/pandas/conversion.py:358, in PandasConversionMixin._collect_as_arrow(self, split_batches)
>     356             results.append(batch_or_indices)
>     357     else:
> --> 358         results = list(batch_stream)
>     359 finally:
>     360     # Join serving thread and raise any exceptions from collectAsArrowToPython
>     361     jsocket_auth_server.getResult()
> File ~/Dev/spark/python/pyspark/sql/pandas/serializers.py:55, in ArrowCollectSerializer.load_stream(self, stream)
>      50 """
>      51 Load a stream of un-ordered Arrow RecordBatches, where the last iteration yields
>      52 a list of indices that can be used to put the RecordBatches in the correct order.
>      53 """
>      54 # load the batches
> ---> 55 for batch in self.serializer.load_stream(stream):
>      56     yield batch
>      58 # load the batch order indices or propagate any error that occurred in the JVM
> File ~/Dev/spark/python/pyspark/sql/pandas/serializers.py:98, in ArrowStreamSerializer.load_stream(self, stream)
>      95 import pyarrow as pa
>      97 reader = pa.ipc.open_stream(stream)
> ---> 98 for batch in reader:
>      99     yield batch
> File ~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/ipc.pxi:638, in __iter__()
> File ~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/ipc.pxi:674, in pyarrow.lib.RecordBatchReader.read_next_batch()
> File ~/.dev/miniconda3/envs/spark_dev/lib/python3.9/site-packages/pyarrow/error.pxi:100, in pyarrow.lib.check_status()
> ArrowInvalid: Ran out of field metadata, likely malformed
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org