You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mike Dusenberry (JIRA)" <ji...@apache.org> on 2016/12/13 19:56:58 UTC
[jira] [Comment Edited] (SPARK-18281) toLocalIterator yields time
out error on pyspark2
[ https://issues.apache.org/jira/browse/SPARK-18281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15746023#comment-15746023 ]
Mike Dusenberry edited comment on SPARK-18281 at 12/13/16 7:56 PM:
-------------------------------------------------------------------
Here's another interesting finding. The first (original) example fails with the timeout. However, if you create the DataFrame, do something with it, and then create the iterator, it will work.
{code}
df = spark.createDataFrame([[1],[2],[3]])
it = df.toLocalIterator() # FAILS HERE
row = next(it)
{code}
{code}
df = spark.createDataFrame([[1],[2],[3]])
df.count()
it = df.toLocalIterator() # No longer fails
row = next(it)
{code}
This leads me to believe there may be something wrong with the creation of the DataFrame.
was (Author: mwdusenb@us.ibm.com):
Here's another interesting finding. The first (original) example fails with the timeout. However, if you create the DataFrame, do something with it, and then create the iterator, it will work.
{code}
df = spark.createDataFrame([[1],[2],[3]])
it = df.toLocalIterator() # FAILS HERE
row = next(it)
{code}
{code}
df = spark.createDataFrame([[1],[2],[3]])
df.count()
it = df.toLocalIterator() # No longer fails
row = next(it)
{code}
> toLocalIterator yields time out error on pyspark2
> -------------------------------------------------
>
> Key: SPARK-18281
> URL: https://issues.apache.org/jira/browse/SPARK-18281
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.0.1
> Environment: Ubuntu 14.04.5 LTS
> Driver: AWS M4.XLARGE
> Slaves: AWS M4.4.XLARGE
> mesos 1.0.1
> spark 2.0.1
> pyspark
> Reporter: Luke Miner
>
> I run the example straight out of the api docs for toLocalIterator and it gives a time out exception:
> {code}
> from pyspark import SparkContext
> sc = SparkContext()
> rdd = sc.parallelize(range(10))
> [x for x in rdd.toLocalIterator()]
> {code}
> conf file:
> spark.driver.maxResultSize 6G
> spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G -XX:+HeapDumpOnOutOfMemoryError
> spark.executor.memory 16G
> spark.executor.uri foo/spark-2.0.1-bin-hadoop2.7.tgz
> spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.fs.s3a.buffer.dir /raid0/spark
> spark.hadoop.fs.s3n.buffer.dir /raid0/spark
> spark.hadoop.fs.s3a.connection.timeout 500000
> spark.hadoop.fs.s3n.multipart.uploads.enabled true
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
> spark.hadoop.parquet.block.size 2147483648
> spark.hadoop.parquet.enable.summary-metadata false
> spark.jars.packages com.databricks:spark-avro_2.11:3.0.1,com.amazonaws:aws-java-sdk-pom:1.10.34
> spark.local.dir /raid0/spark
> spark.mesos.coarse false
> spark.mesos.constraints priority:1
> spark.network.timeout 600
> spark.rpc.message.maxSize 500
> spark.speculation false
> spark.sql.parquet.mergeSchema false
> spark.sql.planner.externalSort true
> spark.submit.deployMode client
> spark.task.cpus 1
> Exception here:
> {code}
> ---------------------------------------------------------------------------
> timeout Traceback (most recent call last)
> <ipython-input-1-6319dd276401> in <module>()
> 2 sc = SparkContext()
> 3 rdd = sc.parallelize(range(10))
> ----> 4 [x for x in rdd.toLocalIterator()]
> /foo/spark-2.0.1-bin-hadoop2.7/python/pyspark/rdd.pyc in _load_from_socket(port, serializer)
> 140 try:
> 141 rf = sock.makefile("rb", 65536)
> --> 142 for item in serializer.load_stream(rf):
> 143 yield item
> 144 finally:
> /foo/spark-2.0.1-bin-hadoop2.7/python/pyspark/serializers.pyc in load_stream(self, stream)
> 137 while True:
> 138 try:
> --> 139 yield self._read_with_length(stream)
> 140 except EOFError:
> 141 return
> /foo/spark-2.0.1-bin-hadoop2.7/python/pyspark/serializers.pyc in _read_with_length(self, stream)
> 154
> 155 def _read_with_length(self, stream):
> --> 156 length = read_int(stream)
> 157 if length == SpecialLengths.END_OF_DATA_SECTION:
> 158 raise EOFError
> /foo/spark-2.0.1-bin-hadoop2.7/python/pyspark/serializers.pyc in read_int(stream)
> 541
> 542 def read_int(stream):
> --> 543 length = stream.read(4)
> 544 if not length:
> 545 raise EOFError
> /usr/lib/python2.7/socket.pyc in read(self, size)
> 378 # fragmentation issues on many platforms.
> 379 try:
> --> 380 data = self._sock.recv(left)
> 381 except error, e:
> 382 if e.args[0] == EINTR:
> timeout: timed out
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org