You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/02/26 20:56:29 UTC

spark git commit: [SPARK-5363] Fix bug in PythonRDD: remove() inside iterator is not safe

Repository: spark
Updated Branches:
  refs/heads/master cfff397f0 -> 7fa960e65


[SPARK-5363] Fix bug in PythonRDD: remove() inside iterator is not safe

Removing elements from a mutable HashSet while iterating over it can cause the
iteration to incorrectly skip over entries that were not removed. If this
happened, PythonRDD would write fewer broadcast variables than the Python
worker was expecting to read, which would cause the Python worker to hang
indefinitely.

Author: Davies Liu <da...@databricks.com>

Closes #4776 from davies/fix_hang and squashes the following commits:

a4384a5 [Davies Liu] fix bug: remvoe() inside iterator is not safe


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fa960e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fa960e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fa960e6

Branch: refs/heads/master
Commit: 7fa960e653a905fc48d4097b49ce560cff919fa2
Parents: cfff397
Author: Davies Liu <da...@databricks.com>
Authored: Thu Feb 26 11:54:17 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Feb 26 11:54:27 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/python/PythonRDD.scala  | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7fa960e6/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index dcb6e63..b1cec0f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -219,14 +219,13 @@ private[spark] class PythonRDD(
         val oldBids = PythonRDD.getWorkerBroadcasts(worker)
         val newBids = broadcastVars.map(_.id).toSet
         // number of different broadcasts
-        val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
+        val toRemove = oldBids.diff(newBids)
+        val cnt = toRemove.size + newBids.diff(oldBids).size
         dataOut.writeInt(cnt)
-        for (bid <- oldBids) {
-          if (!newBids.contains(bid)) {
-            // remove the broadcast from worker
-            dataOut.writeLong(- bid - 1)  // bid >= 0
-            oldBids.remove(bid)
-          }
+        for (bid <- toRemove) {
+          // remove the broadcast from worker
+          dataOut.writeLong(- bid - 1)  // bid >= 0
+          oldBids.remove(bid)
         }
         for (broadcast <- broadcastVars) {
           if (!oldBids.contains(broadcast.id)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org