You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/12/29 12:35:49 UTC

[jira] [Resolved] (SPARK-11711) Finalizer memory leak is pyspark

     [ https://issues.apache.org/jira/browse/SPARK-11711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-11711.
-------------------------------
    Resolution: Duplicate

Merging this into the other as it has slightly more discussion of the same issue

> Finalizer memory leak is pyspark
> --------------------------------
>
>                 Key: SPARK-11711
>                 URL: https://issues.apache.org/jira/browse/SPARK-11711
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.1
>            Reporter: David Watson
>
> I've been having super consistent memory leaks in the java process of python spark streaming scripts on my driver.  A heap profile analysis showed MILLIONS of Finalizer objects.  
> The spark web interface under Executor Thread Dump shows:
> Thread 3: Finalizer (WAITING):
> java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.read(SocketInputStream.java:152)
> java.net.SocketInputStream.read(SocketInputStream.java:122)
> sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
> sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
> sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
> java.io.InputStreamReader.read(InputStreamReader.java:184)
> java.io.BufferedReader.fill(BufferedReader.java:154)
> java.io.BufferedReader.readLine(BufferedReader.java:317)
> java.io.BufferedReader.readLine(BufferedReader.java:382)
> py4j.CallbackConnection.sendCommand(CallbackConnection.java:82)
> py4j.CallbackClient.sendCommand(CallbackClient.java:236)
> py4j.reflection.PythonProxyHandler.finalize(PythonProxyHandler.java:81)
> java.lang.System$2.invokeFinalize(System.java:1213)
> java.lang.ref.Finalizer.runFinalizer(Finalizer.java:98)
> java.lang.ref.Finalizer.access$100(Finalizer.java:34)
> java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:210)
> It appears the problem is with py4j.  I don't have a patch because the bug is inside the python/lib/py4j-0.8.2.1-src.zip zip file.  I've monkey patched and it appears to fix the problem.
> in py4j.java_gateway.CallbackConnection:1186 run():
> """
>                 elif command == GARBAGE_COLLECT_PROXY_COMMAND_NAME:
>                     self.input.readline()
>                     del(self.pool[obj_id])
> """
> NOTE: it doesn't write a response to the socket!
> and on the java side, CallbackConnection.java:82 sendCommand():
> """
> 			returnCommand = reader.readLine();
> """
> I don't know what the protocol is supposed to be, but the java side wants a response but the python side isn't sending it.  As you can see from the stack trace, this jams up the java FinalizerThread which keeps anything from getting finalized, spark related or not.
> My monkey patch to py4j.java_gateway.CallbackConnection:1186 run():
> """
>                 elif command == GARBAGE_COLLECT_PROXY_COMMAND_NAME:
>                     self.input.readline()
>                     del(self.pool[obj_id])
> +                    ## PATCH: send an empty response!
> +                    self.socket.sendall("\n")
> +                    ##
> """
> This bug appears to exist in the current py4j, but I can't find the repository for the 0.8.2.1 version embedded in spark.
> I'm not entirely sure, but I suspect that (at least on the driver) this doesn't normally get triggered because the java object references held by python are long lived so it wouldn't get triggered (thus jamming up the FinalizerThread) until the program ends. 
> My code is peeking at checkpoint file (code below) before starting the script, which looks like it's jamming things up at the beginning, and any other finalized objects (scala? java?) are piling up behind it.
> """
> def loadCheckpoint(checkpointPath):
>     StreamingContext._ensure_initialized()
>     gw = SparkContext._gateway
>     # Check whether valid checkpoint information exists in the given path
>     cp = gw.jvm.CheckpointReader.read(checkpointPath)
>     assert cp is not None, "Couldn't load %s" % checkpointPath
>     return cp.get()
> """
> At any rate, I can confirm that the same situation exists on the worker nodes as well as the driver and this fixes both.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org