You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bryan Cutler (JIRA)" <ji...@apache.org> on 2019/05/07 21:49:00 UTC
[jira] [Resolved] (SPARK-23961) pyspark toLocalIterator throws an
exception
[ https://issues.apache.org/jira/browse/SPARK-23961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bryan Cutler resolved SPARK-23961.
----------------------------------
Resolution: Fixed
Fix Version/s: 3.0.0
Issue resolved by pull request 24070
[https://github.com/apache/spark/pull/24070]
> pyspark toLocalIterator throws an exception
> -------------------------------------------
>
> Key: SPARK-23961
> URL: https://issues.apache.org/jira/browse/SPARK-23961
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.0.2, 2.1.2, 2.2.1, 2.3.0
> Reporter: Michel Lemay
> Assignee: Bryan Cutler
> Priority: Minor
> Labels: DataFrame, pyspark
> Fix For: 3.0.0
>
>
> Given a dataframe and use toLocalIterator. If we do not consume all records, it will throw:
> {quote}ERROR PythonRDD: Error while sending iterator
> java.net.SocketException: Connection reset by peer: socket write error
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:497)
> at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
> at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:509)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
> at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:705)
> at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
> at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:705)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706)
> {quote}
>
> To reproduce, here is a simple pyspark shell script that show the error:
> {quote}import itertools
> df = spark.read.parquet("large parquet folder").cache()
> print(df.count())
> b = df.toLocalIterator()
> print(len(list(itertools.islice(b, 20))))
> b = None # Make the iterator goes out of scope. Throws here.
> {quote}
>
> Observations:
> * Consuming all records do not throw. Taking only a subset of the partitions create the error.
> * In another experiment, doing the same on a regular RDD works if we cache/materialize it. If we do not cache the RDD, it throws similarly.
> * It works in scala shell
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org