You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuanjian Li (JIRA)" <ji...@apache.org> on 2019/01/09 03:47:00 UTC

[jira] [Commented] (SPARK-26573) Python worker not reused with mapPartitions if not consuming iterator

    [ https://issues.apache.org/jira/browse/SPARK-26573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16737815#comment-16737815 ] 

Yuanjian Li commented on SPARK-26573:
-------------------------------------

Leave some thoughts during the work in SPARK-26549. 

It's easy to fix this problem only in sc.parallelize(xrange) cause the passed-in function controlled in framework code and we can force use the iterator to fix this.

But for the user pass-in function things comes difference. JVM side can't know whether the iterator is used in the lambda function, it will always write the iterator to the socket stream. Python worker side should carefully skip to read the iterator as well as END_OF_DATA_SECTION and jump to the end to check END_OF_STREAM.

> Python worker not reused with mapPartitions if not consuming iterator
> ---------------------------------------------------------------------
>
>                 Key: SPARK-26573
>                 URL: https://issues.apache.org/jira/browse/SPARK-26573
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Bryan Cutler
>            Priority: Major
>
> In PySpark, if the user calls RDD mapPartitions and does not consume the iterator, the Python worker will read the wrong signal and not be reused.  Test to replicate:
> {code:java}
> def test_worker_reused_in_map_partition(self):
>     def map_pid(iterator):
>         # Fails when iterator not consumed, e.g. len(list(iterator))
>         return (os.getpid() for _ in xrange(10))
>     rdd = self.sc.parallelize([], 10)
>     worker_pids_a = rdd.mapPartitions(map_pid).collect()
>     worker_pids_b = rdd.mapPartitions(map_pid).collect()
>     self.assertTrue(all([pid in worker_pids_a for pid in worker_pids_b])){code}
> This is related to SPARK-26549 which fixes this issue, but only for use in rdd.range



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org