You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2021/05/26 03:29:00 UTC

[jira] [Assigned] (SPARK-35512) pyspark partitionBy may encounter 'OverflowError: cannot convert float infinity to integer'

     [ https://issues.apache.org/jira/browse/SPARK-35512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-35512:
------------------------------------

    Assignee: Apache Spark

> pyspark partitionBy may encounter 'OverflowError: cannot convert float infinity to integer'
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-35512
>                 URL: https://issues.apache.org/jira/browse/SPARK-35512
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.2
>            Reporter: nolan liu
>            Assignee: Apache Spark
>            Priority: Major
>
> h2. Code sample
> {code:python}
> # pyspark
> rdd = ...
> new_rdd = rdd.partitionBy(64){code}
> An OverflowError is raised when there is a {color:#ff0000}big input file{color} and {color:#ff0000}executor memory{color} is not big enough.
> h2. Error information: 
>  
> {code:java}
> TaskSetManager: Lost task 312.0 in stage 1.0 (TID 748, 11.4.137.5, executor 83): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
> File "/opt/spark3/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
> process()
> File "/opt/spark3/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
> serializer.dump_stream(out_iter, outfile)
> File "/opt/spark3/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
> for obj in iterator:
> File "/opt/spark3/python/lib/pyspark.zip/pyspark/rdd.py", line 1899, in add_shuffle_key
> OverflowError: cannot convert float infinity to integer
> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
> at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
> at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
> at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156)
> at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:130)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1420)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
> h2. Spark code
>  [https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L2072]
> {code:python}
>             for k, v in iterator:
>                 buckets[partitionFunc(k) % numPartitions].append((k, v))
>                 c += 1                # check used memory and avg size of chunk of objects 
>                 if (c % 1000 == 0 and get_used_memory() > limit
>                         or c > batch):
>                     n, size = len(buckets), 0
>                     for split in list(buckets.keys()):
>                         yield pack_long(split)
>                         d = outputSerializer.dumps(buckets[split])
>                         del buckets[split]
>                         yield d
>                         size += len(d)                    avg = int(size / n) >> 20
>                     # let 1M < avg < 10M
>                     if avg < 1:
>                         batch *= 1.5
>                     elif avg > 10:
>                         batch = max(int(batch / 1.5), 1)
>                     c = 0
> {code}
> h2. Explanation
> *`batch`* may grow infinity when `*get_used_memory() > limit*` is true, then overflow at `*max(int(batch / 1.5), 1)*`
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org