You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "nolan liou (Jira)" <ji...@apache.org> on 2021/05/25 08:50:00 UTC

[jira] [Created] (SPARK-35512) pyspark partitionBy may encounter 'OverflowError: cannot convert float infinity to integer'

nolan liou created SPARK-35512:
----------------------------------

             Summary: pyspark partitionBy may encounter 'OverflowError: cannot convert float infinity to integer'
                 Key: SPARK-35512
                 URL: https://issues.apache.org/jira/browse/SPARK-35512
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.0.2
            Reporter: nolan liou


h2. Code sample
{code:python}
# pyspark
rdd = ...
new_rdd = rdd.partitionBy(64){code}
An OverflowError is raised when there is a {color:#ff0000}big input file{color} and {color:#ff0000}executor memory{color} is not big enough.
h2. Error information: 

 
{code:java}
TaskSetManager: Lost task 312.0 in stage 1.0 (TID 748, 11.4.137.5, executor 83): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark3/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/opt/spark3/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/spark3/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/opt/spark3/python/lib/pyspark.zip/pyspark/rdd.py", line 1899, in add_shuffle_key
OverflowError: cannot convert float infinity to integer
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:130)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1420)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){code}
h2. Spark code

 [https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L2072]
{code:python}
            for k, v in iterator:
                buckets[partitionFunc(k) % numPartitions].append((k, v))
                c += 1                # check used memory and avg size of chunk of objects 
                if (c % 1000 == 0 and get_used_memory() > limit
                        or c > batch):
                    n, size = len(buckets), 0
                    for split in list(buckets.keys()):
                        yield pack_long(split)
                        d = outputSerializer.dumps(buckets[split])
                        del buckets[split]
                        yield d
                        size += len(d)                    avg = int(size / n) >> 20
                    # let 1M < avg < 10M
                    if avg < 1:
                        batch *= 1.5
                    elif avg > 10:
                        batch = max(int(batch / 1.5), 1)
                    c = 0
{code}
h2. Explanation

*`batch`* may grow infinity when `*get_used_memory() > limit*` is true, then overflow at `*max(int(batch / 1.5), 1)*`

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org