You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/07/04 08:02:39 UTC

git commit: SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark

Repository: spark
Updated Branches:
  refs/heads/master 3894a49be -> 97a0bfe1c


SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark

JIRA: https://issues.apache.org/jira/browse/SPARK-2282

This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage that lasts for some period of time after the communication closes.

This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with the buildup of the rapid creation of these sockets.

Author: Aaron Davidson <aa...@databricks.com>

Closes #1220 from aarondav/SPARK-2282 and squashes the following commits:

2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a0bfe1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a0bfe1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a0bfe1

Branch: refs/heads/master
Commit: 97a0bfe1c0261384f09d53f9350de52fb6446d59
Parents: 3894a49
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Jul 3 23:02:36 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jul 3 23:02:36 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97a0bfe1/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f6570d3..462e094 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
     } else {
       // This happens on the master, where we pass the updates to Python through a socket
       val socket = new Socket(serverHost, serverPort)
+      // SPARK-2282: Immediately reuse closed sockets because we create one per task.
+      socket.setReuseAddress(true)
       val in = socket.getInputStream
       val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
       out.writeInt(val2.size)