You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jim Blomo <ji...@gmail.com> on 2014/03/22 02:18:44 UTC

pySpark memory usage

Hi all, I'm wondering if there's any settings I can use to reduce the
memory needed by the PythonRDD when computing simple stats.  I am
getting OutOfMemoryError exceptions while calculating count() on big,
but not absurd, records.  It seems like PythonRDD is trying to keep
too many of these records in memory, when all that is needed is to
stream through them and count.  Any tips for getting through this
workload?


Code:
session = sc.textFile('s3://...json.gz') # ~54GB of compressed data

# the biggest individual text line is ~3MB
parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
(loads(y), loads(s)))
parsed.persist(StorageLevel.MEMORY_AND_DISK)

parsed.count()
# will never finish: executor.Executor: Uncaught exception will FAIL
all executors

Incidentally the whole app appears to be killed, but this error is not
propagated to the shell.

Cluster:
15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)

Exception:
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
        at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
        at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

Re: pySpark memory usage

Posted by Matei Zaharia <ma...@gmail.com>.
Cool, that’s good to hear. We’d also like to add spilling in Python itself, or at least make it exit with a good message if it can’t do it.

Matei

On May 14, 2014, at 10:47 AM, Jim Blomo <ji...@gmail.com> wrote:

> That worked amazingly well, thank you Matei!  Numbers that worked for
> me were 400 for the textFile()s, 1500 for the join()s.
> 
> On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> Hey Jim, unfortunately external spilling is not implemented in Python right now. While it would be possible to update combineByKey to do smarter stuff here, one simple workaround you can try is to launch more map tasks (or more reduce tasks). To set the minimum number of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)).
>> 
>> Matei
>> 
>> On May 12, 2014, at 5:47 PM, Jim Blomo <ji...@gmail.com> wrote:
>> 
>>> Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.
>>> 
>>> I noticed that the S3 exception seem to occur more frequently when the
>>> box is swapping.  Why is the box swapping?  combineByKey seems to make
>>> the assumption that it can fit an entire partition in memory when
>>> doing the combineLocally step.  I'm going to try to break this apart
>>> but will need some sort of heuristic options include looking at memory
>>> usage via the resource module and trying to keep below
>>> 'spark.executor.memory', or using batchSize to limit the number of
>>> entries in the dictionary.  Let me know if you have any opinions.
>>> 
>>> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <il...@gmail.com> wrote:
>>>> I'd just like to update this thread by pointing to the PR based on our
>>>> initial design: https://github.com/apache/spark/pull/640
>>>> 
>>>> This solution is a little more general and avoids catching IOException
>>>> altogether. Long live exception propagation!
>>>> 
>>>> 
>>>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>>>> 
>>>>> Hey Jim,
>>>>> 
>>>>> This IOException thing is a general issue that we need to fix and your
>>>>> observation is spot-in. There is actually a JIRA for it here I created a few
>>>>> days ago:
>>>>> https://issues.apache.org/jira/browse/SPARK-1579
>>>>> 
>>>>> Aaron is assigned on that one but not actively working on it, so we'd
>>>>> welcome a PR from you on this if you are interested.
>>>>> 
>>>>> The first thought we had was to set a volatile flag when the reader sees
>>>>> an exception (indicating there was a failure in the task) and avoid
>>>>> swallowing the IOException in the writer if this happens. But I think there
>>>>> is a race here where the writer sees the error first before the reader knows
>>>>> what is going on.
>>>>> 
>>>>> Anyways maybe if you have a simpler solution you could sketch it out in
>>>>> the JIRA and we could talk over there. The current proposal in the JIRA is
>>>>> somewhat complicated...
>>>>> 
>>>>> - Patrick
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>> 
>>>>>> FYI, it looks like this "stdin writer to Python finished early" error was
>>>>>> caused by a break in the connection to S3, from which the data was being
>>>>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>>>>> catching can potentially mask an exception for the data source, and that is
>>>>>> indeed what I see happening.  The underlying libraries (jets3t and
>>>>>> httpclient) do have retry capabilities, but I don't see a great way of
>>>>>> setting them through Spark code.  Instead I added the patch below which
>>>>>> kills the worker on the exception.  This allows me to completely load the
>>>>>> data source after a few worker retries.
>>>>>> 
>>>>>> Unfortunately, java.net.SocketException is the same error that is
>>>>>> sometimes expected from the client when using methods like take().  One
>>>>>> approach around this conflation is to create a new locally scoped exception
>>>>>> class, eg. WriterException, catch java.net.SocketException during output
>>>>>> writing, then re-throw the new exception.  The worker thread could then
>>>>>> distinguish between the reasons java.net.SocketException might be thrown.
>>>>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>>>> 
>>>>>> Let me know if I should open a ticket or discuss this on the developers
>>>>>> list instead.  Best,
>>>>>> 
>>>>>> Jim
>>>>>> 
>>>>>> diff --git
>>>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> index 0d71fdb..f31158c 100644
>>>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>>>>            readerException = e
>>>>>>            Try(worker.shutdownOutput()) // kill Python worker process
>>>>>> 
>>>>>> +          case e: java.net.SocketException =>
>>>>>> +           // This can happen if a connection to the datasource, eg S3,
>>>>>> resets
>>>>>> +           // or is otherwise broken
>>>>>> +            readerException = e
>>>>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>>>>> +
>>>>>>          case e: IOException =>
>>>>>>            // This can happen for legitimate reasons if the Python code
>>>>>> stops returning data
>>>>>>            // before we are done passing elements through, e.g., for
>>>>>> take(). Just log a message to
>>>>>> 
>>>>>> 
>>>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>> 
>>>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>>>>> 343)
>>>>>>> 
>>>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>> Okay, thanks. Do you have any info on how large your records and data
>>>>>>>> file are? I'd like to reproduce and fix this.
>>>>>>>> 
>>>>>>>> Matei
>>>>>>>> 
>>>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Hi Matei, thanks for working with me to find these issues.
>>>>>>>>> 
>>>>>>>>> To summarize, the issues I've seen are:
>>>>>>>>> 0.9.0:
>>>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>> 
>>>>>>>>> SNAPSHOT 2014-03-18:
>>>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>>>>>>> Java heap space.  To me this indicates a memory leak since Spark
>>>>>>>>> should simply be counting records of size < 3MB
>>>>>>>>> - Without persist(), "stdin writer to Python finished early" hangs
>>>>>>>>> the
>>>>>>>>> application, unknown root cause
>>>>>>>>> 
>>>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>>>>>>>> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>>>>>>>> problem:
>>>>>>>>> 
>>>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>>>>>>>> early
>>>>>>>>> java.net.SocketException: Connection reset
>>>>>>>>>      at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>>>>>>>      at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>>>>>>>      at
>>>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>>>>>>>      at
>>>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>>>>>>>      at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>>>>>>>      at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>>>>>>>      at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>>>>>>>      at
>>>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>>>>>>>      at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>      at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>      at
>>>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>>>>>>>      at
>>>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>>>>>>>      at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>>>>>>>      at
>>>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>>>>>>>      at
>>>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>>>>>>>      at
>>>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>>>>>>>      at
>>>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>>>>>>>      at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>      at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>      at java.io.DataInputStream.read(DataInputStream.java:100)
>>>>>>>>>      at
>>>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>>>>>>>      at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>>>>>>>      at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>>>>>>>      at
>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>      at
>>>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>      at
>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>> Cool, thanks for the update. Have you tried running a branch with
>>>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>>>>>>>>> memory leak issue are you referring to, is it separate from this? (Couldn't
>>>>>>>>>> find it earlier in the thread.)
>>>>>>>>>> 
>>>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template to
>>>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>>>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present in
>>>>>>>>>> "conf" on all workers.
>>>>>>>>>> 
>>>>>>>>>> BTW I've managed to run PySpark with this fix on some reasonably
>>>>>>>>>> large S3 data (multiple GB) and it was fine. It might happen only if records
>>>>>>>>>> are large, or something like that. How much heap are you giving to your
>>>>>>>>>> executors, and does it show that much in the web UI?
>>>>>>>>>> 
>>>>>>>>>> Matei
>>>>>>>>>> 
>>>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>>> I think the problem I ran into in 0.9 is covered in
>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>>> 
>>>>>>>>>>> When I kill the python process, the stacktrace I gets indicates
>>>>>>>>>>> that
>>>>>>>>>>> this happens at initialization.  It looks like the initial write to
>>>>>>>>>>> the Python process does not go through, and then the iterator hangs
>>>>>>>>>>> waiting for output.  I haven't had luck turning on debugging for
>>>>>>>>>>> the
>>>>>>>>>>> executor process.  Still trying to learn the lgo4j properties that
>>>>>>>>>>> need to be set.
>>>>>>>>>>> 
>>>>>>>>>>> No luck yet on tracking down the memory leak.
>>>>>>>>>>> 
>>>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>>>>>>>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>>>>>>>>>> (crashed)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>>>>>>>     at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>    at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>>>>>>>>>>> Python
>>>>>>>>>>>> finished early` so frequently I wasn't able to load even a 1GB
>>>>>>>>>>>> file.
>>>>>>>>>>>> Let me know if I can provide any other info!
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>>>>>>>>>>>> 0.8)? We'll try to look into these, seems like a serious error.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Matei
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>>>>>>>>>>>>> Hadoop
>>>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>>>>>>>>>>>>> none
>>>>>>>>>>>>>> have succeeded.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>>>>>>>>>>>>> batchSize=1.  5
>>>>>>>>>>>>>> of the 175 executors hung, and I had to kill the python process
>>>>>>>>>>>>>> to get
>>>>>>>>>>>>>> things going again.  The only indication of this in the logs was
>>>>>>>>>>>>>> `INFO
>>>>>>>>>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> With batchSize=1 and persist, a new memory error came up in
>>>>>>>>>>>>>> several
>>>>>>>>>>>>>> tasks, before the app was failed:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>>>>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>    at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>>>    at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>>>>>>>>    at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>>>>>>>>    at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>>>>>>>>    at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>>>>>>>>    at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> There are other exceptions, but I think they all stem from the
>>>>>>>>>>>>>> above,
>>>>>>>>>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>>>>>>>>>> BlockManagerMaster
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Let me know if there are other settings I should try, or if I
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> try a newer snapshot.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia
>>>>>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>>>>>> Hey Jim,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>>>>>>>>>>>>>> makes it group multiple objects together before passing them between Java
>>>>>>>>>>>>>>> and Python, but this may be too high by default. Try passing batchSize=10 to
>>>>>>>>>>>>>>> your SparkContext constructor to lower it (the default is 1024). Or even
>>>>>>>>>>>>>>> batchSize=1 to match earlier versions.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>>>>>>>>>>>>>>> reduce the
>>>>>>>>>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>>>>>>>>>>>>>>> on big,
>>>>>>>>>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>> too many of these records in memory, when all that is needed
>>>>>>>>>>>>>>>> is to
>>>>>>>>>>>>>>>> stream through them and count.  Any tips for getting through
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> workload?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>>>>>>>>>>>>>>> (y,s):
>>>>>>>>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> parsed.count()
>>>>>>>>>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>>>>>>>>>>>>>>> will FAIL
>>>>>>>>>>>>>>>> all executors
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Incidentally the whole app appears to be killed, but this
>>>>>>>>>>>>>>>> error is not
>>>>>>>>>>>>>>>> propagated to the shell.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Cluster:
>>>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>>>>>>>>>>>>>>> spark.executor.memory=10GB)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Exception:
>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>>>>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>>>>>>>>   at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 


Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
Should add that I had to tweak the numbers a bit to keep above swap
threshold, but below the "Too many open files" error (`ulimit -n` is
32768).

On Wed, May 14, 2014 at 10:47 AM, Jim Blomo <ji...@gmail.com> wrote:
> That worked amazingly well, thank you Matei!  Numbers that worked for
> me were 400 for the textFile()s, 1500 for the join()s.
>
> On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> Hey Jim, unfortunately external spilling is not implemented in Python right now. While it would be possible to update combineByKey to do smarter stuff here, one simple workaround you can try is to launch more map tasks (or more reduce tasks). To set the minimum number of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)).
>>
>> Matei
>>
>> On May 12, 2014, at 5:47 PM, Jim Blomo <ji...@gmail.com> wrote:
>>
>>> Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.
>>>
>>> I noticed that the S3 exception seem to occur more frequently when the
>>> box is swapping.  Why is the box swapping?  combineByKey seems to make
>>> the assumption that it can fit an entire partition in memory when
>>> doing the combineLocally step.  I'm going to try to break this apart
>>> but will need some sort of heuristic options include looking at memory
>>> usage via the resource module and trying to keep below
>>> 'spark.executor.memory', or using batchSize to limit the number of
>>> entries in the dictionary.  Let me know if you have any opinions.
>>>
>>> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <il...@gmail.com> wrote:
>>>> I'd just like to update this thread by pointing to the PR based on our
>>>> initial design: https://github.com/apache/spark/pull/640
>>>>
>>>> This solution is a little more general and avoids catching IOException
>>>> altogether. Long live exception propagation!
>>>>
>>>>
>>>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>>>>
>>>>> Hey Jim,
>>>>>
>>>>> This IOException thing is a general issue that we need to fix and your
>>>>> observation is spot-in. There is actually a JIRA for it here I created a few
>>>>> days ago:
>>>>> https://issues.apache.org/jira/browse/SPARK-1579
>>>>>
>>>>> Aaron is assigned on that one but not actively working on it, so we'd
>>>>> welcome a PR from you on this if you are interested.
>>>>>
>>>>> The first thought we had was to set a volatile flag when the reader sees
>>>>> an exception (indicating there was a failure in the task) and avoid
>>>>> swallowing the IOException in the writer if this happens. But I think there
>>>>> is a race here where the writer sees the error first before the reader knows
>>>>> what is going on.
>>>>>
>>>>> Anyways maybe if you have a simpler solution you could sketch it out in
>>>>> the JIRA and we could talk over there. The current proposal in the JIRA is
>>>>> somewhat complicated...
>>>>>
>>>>> - Patrick
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>
>>>>>> FYI, it looks like this "stdin writer to Python finished early" error was
>>>>>> caused by a break in the connection to S3, from which the data was being
>>>>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>>>>> catching can potentially mask an exception for the data source, and that is
>>>>>> indeed what I see happening.  The underlying libraries (jets3t and
>>>>>> httpclient) do have retry capabilities, but I don't see a great way of
>>>>>> setting them through Spark code.  Instead I added the patch below which
>>>>>> kills the worker on the exception.  This allows me to completely load the
>>>>>> data source after a few worker retries.
>>>>>>
>>>>>> Unfortunately, java.net.SocketException is the same error that is
>>>>>> sometimes expected from the client when using methods like take().  One
>>>>>> approach around this conflation is to create a new locally scoped exception
>>>>>> class, eg. WriterException, catch java.net.SocketException during output
>>>>>> writing, then re-throw the new exception.  The worker thread could then
>>>>>> distinguish between the reasons java.net.SocketException might be thrown.
>>>>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>>>>
>>>>>> Let me know if I should open a ticket or discuss this on the developers
>>>>>> list instead.  Best,
>>>>>>
>>>>>> Jim
>>>>>>
>>>>>> diff --git
>>>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> index 0d71fdb..f31158c 100644
>>>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>>>>             readerException = e
>>>>>>             Try(worker.shutdownOutput()) // kill Python worker process
>>>>>>
>>>>>> +          case e: java.net.SocketException =>
>>>>>> +           // This can happen if a connection to the datasource, eg S3,
>>>>>> resets
>>>>>> +           // or is otherwise broken
>>>>>> +            readerException = e
>>>>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>>>>> +
>>>>>>           case e: IOException =>
>>>>>>             // This can happen for legitimate reasons if the Python code
>>>>>> stops returning data
>>>>>>             // before we are done passing elements through, e.g., for
>>>>>> take(). Just log a message to
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>
>>>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>>>>> 343)
>>>>>>>
>>>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>> Okay, thanks. Do you have any info on how large your records and data
>>>>>>>> file are? I'd like to reproduce and fix this.
>>>>>>>>
>>>>>>>> Matei
>>>>>>>>
>>>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Matei, thanks for working with me to find these issues.
>>>>>>>>>
>>>>>>>>> To summarize, the issues I've seen are:
>>>>>>>>> 0.9.0:
>>>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>
>>>>>>>>> SNAPSHOT 2014-03-18:
>>>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>>>>>>> Java heap space.  To me this indicates a memory leak since Spark
>>>>>>>>> should simply be counting records of size < 3MB
>>>>>>>>> - Without persist(), "stdin writer to Python finished early" hangs
>>>>>>>>> the
>>>>>>>>> application, unknown root cause
>>>>>>>>>
>>>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>>>>>>>> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>>>>>>>> problem:
>>>>>>>>>
>>>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>>>>>>>> early
>>>>>>>>> java.net.SocketException: Connection reset
>>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>>>>>>>       at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>>>>>>>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>>>>>>>       at
>>>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>>>>>>>       at
>>>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>       at java.io.DataInputStream.read(DataInputStream.java:100)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>>>>>>>       at
>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>       at
>>>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>       at
>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>> Cool, thanks for the update. Have you tried running a branch with
>>>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>>>>>>>>> memory leak issue are you referring to, is it separate from this? (Couldn't
>>>>>>>>>> find it earlier in the thread.)
>>>>>>>>>>
>>>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template to
>>>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>>>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present in
>>>>>>>>>> "conf" on all workers.
>>>>>>>>>>
>>>>>>>>>> BTW I've managed to run PySpark with this fix on some reasonably
>>>>>>>>>> large S3 data (multiple GB) and it was fine. It might happen only if records
>>>>>>>>>> are large, or something like that. How much heap are you giving to your
>>>>>>>>>> executors, and does it show that much in the web UI?
>>>>>>>>>>
>>>>>>>>>> Matei
>>>>>>>>>>
>>>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think the problem I ran into in 0.9 is covered in
>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>>>
>>>>>>>>>>> When I kill the python process, the stacktrace I gets indicates
>>>>>>>>>>> that
>>>>>>>>>>> this happens at initialization.  It looks like the initial write to
>>>>>>>>>>> the Python process does not go through, and then the iterator hangs
>>>>>>>>>>> waiting for output.  I haven't had luck turning on debugging for
>>>>>>>>>>> the
>>>>>>>>>>> executor process.  Still trying to learn the lgo4j properties that
>>>>>>>>>>> need to be set.
>>>>>>>>>>>
>>>>>>>>>>> No luck yet on tracking down the memory leak.
>>>>>>>>>>>
>>>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>>>>>>>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>>>>>>>>>> (crashed)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>>>>>>>>      at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>>>>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>>>>>>>      at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>     at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>      at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>>>>>>>>>>> Python
>>>>>>>>>>>> finished early` so frequently I wasn't able to load even a 1GB
>>>>>>>>>>>> file.
>>>>>>>>>>>> Let me know if I can provide any other info!
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>>>>>>>>>>>> 0.8)? We'll try to look into these, seems like a serious error.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>>>>>>>>>>>>> Hadoop
>>>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>>>>>>>>>>>>> none
>>>>>>>>>>>>>> have succeeded.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>>>>>>>>>>>>> batchSize=1.  5
>>>>>>>>>>>>>> of the 175 executors hung, and I had to kill the python process
>>>>>>>>>>>>>> to get
>>>>>>>>>>>>>> things going again.  The only indication of this in the logs was
>>>>>>>>>>>>>> `INFO
>>>>>>>>>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> With batchSize=1 and persist, a new memory error came up in
>>>>>>>>>>>>>> several
>>>>>>>>>>>>>> tasks, before the app was failed:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>>>>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>>>     at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>>>>>>>>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There are other exceptions, but I think they all stem from the
>>>>>>>>>>>>>> above,
>>>>>>>>>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>>>>>>>>>> BlockManagerMaster
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know if there are other settings I should try, or if I
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> try a newer snapshot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia
>>>>>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>>>>>> Hey Jim,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>>>>>>>>>>>>>> makes it group multiple objects together before passing them between Java
>>>>>>>>>>>>>>> and Python, but this may be too high by default. Try passing batchSize=10 to
>>>>>>>>>>>>>>> your SparkContext constructor to lower it (the default is 1024). Or even
>>>>>>>>>>>>>>> batchSize=1 to match earlier versions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>>>>>>>>>>>>>>> reduce the
>>>>>>>>>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>>>>>>>>>>>>>>> on big,
>>>>>>>>>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>> too many of these records in memory, when all that is needed
>>>>>>>>>>>>>>>> is to
>>>>>>>>>>>>>>>> stream through them and count.  Any tips for getting through
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> workload?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>>>>>>>>>>>>>>> (y,s):
>>>>>>>>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> parsed.count()
>>>>>>>>>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>>>>>>>>>>>>>>> will FAIL
>>>>>>>>>>>>>>>> all executors
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Incidentally the whole app appears to be killed, but this
>>>>>>>>>>>>>>>> error is not
>>>>>>>>>>>>>>>> propagated to the shell.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cluster:
>>>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>>>>>>>>>>>>>>> spark.executor.memory=10GB)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Exception:
>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>>>>>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>

Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
That worked amazingly well, thank you Matei!  Numbers that worked for
me were 400 for the textFile()s, 1500 for the join()s.

On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Hey Jim, unfortunately external spilling is not implemented in Python right now. While it would be possible to update combineByKey to do smarter stuff here, one simple workaround you can try is to launch more map tasks (or more reduce tasks). To set the minimum number of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)).
>
> Matei
>
> On May 12, 2014, at 5:47 PM, Jim Blomo <ji...@gmail.com> wrote:
>
>> Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.
>>
>> I noticed that the S3 exception seem to occur more frequently when the
>> box is swapping.  Why is the box swapping?  combineByKey seems to make
>> the assumption that it can fit an entire partition in memory when
>> doing the combineLocally step.  I'm going to try to break this apart
>> but will need some sort of heuristic options include looking at memory
>> usage via the resource module and trying to keep below
>> 'spark.executor.memory', or using batchSize to limit the number of
>> entries in the dictionary.  Let me know if you have any opinions.
>>
>> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <il...@gmail.com> wrote:
>>> I'd just like to update this thread by pointing to the PR based on our
>>> initial design: https://github.com/apache/spark/pull/640
>>>
>>> This solution is a little more general and avoids catching IOException
>>> altogether. Long live exception propagation!
>>>
>>>
>>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>>>
>>>> Hey Jim,
>>>>
>>>> This IOException thing is a general issue that we need to fix and your
>>>> observation is spot-in. There is actually a JIRA for it here I created a few
>>>> days ago:
>>>> https://issues.apache.org/jira/browse/SPARK-1579
>>>>
>>>> Aaron is assigned on that one but not actively working on it, so we'd
>>>> welcome a PR from you on this if you are interested.
>>>>
>>>> The first thought we had was to set a volatile flag when the reader sees
>>>> an exception (indicating there was a failure in the task) and avoid
>>>> swallowing the IOException in the writer if this happens. But I think there
>>>> is a race here where the writer sees the error first before the reader knows
>>>> what is going on.
>>>>
>>>> Anyways maybe if you have a simpler solution you could sketch it out in
>>>> the JIRA and we could talk over there. The current proposal in the JIRA is
>>>> somewhat complicated...
>>>>
>>>> - Patrick
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>
>>>>> FYI, it looks like this "stdin writer to Python finished early" error was
>>>>> caused by a break in the connection to S3, from which the data was being
>>>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>>>> catching can potentially mask an exception for the data source, and that is
>>>>> indeed what I see happening.  The underlying libraries (jets3t and
>>>>> httpclient) do have retry capabilities, but I don't see a great way of
>>>>> setting them through Spark code.  Instead I added the patch below which
>>>>> kills the worker on the exception.  This allows me to completely load the
>>>>> data source after a few worker retries.
>>>>>
>>>>> Unfortunately, java.net.SocketException is the same error that is
>>>>> sometimes expected from the client when using methods like take().  One
>>>>> approach around this conflation is to create a new locally scoped exception
>>>>> class, eg. WriterException, catch java.net.SocketException during output
>>>>> writing, then re-throw the new exception.  The worker thread could then
>>>>> distinguish between the reasons java.net.SocketException might be thrown.
>>>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>>>
>>>>> Let me know if I should open a ticket or discuss this on the developers
>>>>> list instead.  Best,
>>>>>
>>>>> Jim
>>>>>
>>>>> diff --git
>>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> index 0d71fdb..f31158c 100644
>>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>>>             readerException = e
>>>>>             Try(worker.shutdownOutput()) // kill Python worker process
>>>>>
>>>>> +          case e: java.net.SocketException =>
>>>>> +           // This can happen if a connection to the datasource, eg S3,
>>>>> resets
>>>>> +           // or is otherwise broken
>>>>> +            readerException = e
>>>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>>>> +
>>>>>           case e: IOException =>
>>>>>             // This can happen for legitimate reasons if the Python code
>>>>> stops returning data
>>>>>             // before we are done passing elements through, e.g., for
>>>>> take(). Just log a message to
>>>>>
>>>>>
>>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>
>>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>>>> 343)
>>>>>>
>>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
>>>>>> wrote:
>>>>>>> Okay, thanks. Do you have any info on how large your records and data
>>>>>>> file are? I'd like to reproduce and fix this.
>>>>>>>
>>>>>>> Matei
>>>>>>>
>>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Matei, thanks for working with me to find these issues.
>>>>>>>>
>>>>>>>> To summarize, the issues I've seen are:
>>>>>>>> 0.9.0:
>>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>
>>>>>>>> SNAPSHOT 2014-03-18:
>>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>>>>>> Java heap space.  To me this indicates a memory leak since Spark
>>>>>>>> should simply be counting records of size < 3MB
>>>>>>>> - Without persist(), "stdin writer to Python finished early" hangs
>>>>>>>> the
>>>>>>>> application, unknown root cause
>>>>>>>>
>>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>>>>>>> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>>>>>>> problem:
>>>>>>>>
>>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>>>>>>> early
>>>>>>>> java.net.SocketException: Connection reset
>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>>>>>>       at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>       at
>>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>>>>>>       at
>>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>>>>>>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>>>>>>       at
>>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>>>>>>       at
>>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>>>>>>       at
>>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>       at java.io.DataInputStream.read(DataInputStream.java:100)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>>>>>>       at
>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>>>>>>       at
>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>>>>>>       at
>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>       at
>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>>>>>>       at
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>       at
>>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>       at
>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>       at
>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>       at
>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>> Cool, thanks for the update. Have you tried running a branch with
>>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>>>>>>>> memory leak issue are you referring to, is it separate from this? (Couldn't
>>>>>>>>> find it earlier in the thread.)
>>>>>>>>>
>>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template to
>>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present in
>>>>>>>>> "conf" on all workers.
>>>>>>>>>
>>>>>>>>> BTW I've managed to run PySpark with this fix on some reasonably
>>>>>>>>> large S3 data (multiple GB) and it was fine. It might happen only if records
>>>>>>>>> are large, or something like that. How much heap are you giving to your
>>>>>>>>> executors, and does it show that much in the web UI?
>>>>>>>>>
>>>>>>>>> Matei
>>>>>>>>>
>>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think the problem I ran into in 0.9 is covered in
>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>>
>>>>>>>>>> When I kill the python process, the stacktrace I gets indicates
>>>>>>>>>> that
>>>>>>>>>> this happens at initialization.  It looks like the initial write to
>>>>>>>>>> the Python process does not go through, and then the iterator hangs
>>>>>>>>>> waiting for output.  I haven't had luck turning on debugging for
>>>>>>>>>> the
>>>>>>>>>> executor process.  Still trying to learn the lgo4j properties that
>>>>>>>>>> need to be set.
>>>>>>>>>>
>>>>>>>>>> No luck yet on tracking down the memory leak.
>>>>>>>>>>
>>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>>>>>>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>>>>>>>>> (crashed)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>>>>>>>      at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>>>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>>>>>>      at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>     at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>      at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>>>>>>>>>> Python
>>>>>>>>>>> finished early` so frequently I wasn't able to load even a 1GB
>>>>>>>>>>> file.
>>>>>>>>>>> Let me know if I can provide any other info!
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>>>>>>>>>>> 0.8)? We'll try to look into these, seems like a serious error.
>>>>>>>>>>>>
>>>>>>>>>>>> Matei
>>>>>>>>>>>>
>>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>>>>>>>>>>>> Hadoop
>>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>>>>>>>>>>>> none
>>>>>>>>>>>>> have succeeded.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>>>>>>>>>>>> batchSize=1.  5
>>>>>>>>>>>>> of the 175 executors hung, and I had to kill the python process
>>>>>>>>>>>>> to get
>>>>>>>>>>>>> things going again.  The only indication of this in the logs was
>>>>>>>>>>>>> `INFO
>>>>>>>>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>>>>>>>>>
>>>>>>>>>>>>> With batchSize=1 and persist, a new memory error came up in
>>>>>>>>>>>>> several
>>>>>>>>>>>>> tasks, before the app was failed:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>>>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>>     at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>>>>>>>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>
>>>>>>>>>>>>> There are other exceptions, but I think they all stem from the
>>>>>>>>>>>>> above,
>>>>>>>>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>>>>>>>>> BlockManagerMaster
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me know if there are other settings I should try, or if I
>>>>>>>>>>>>> should
>>>>>>>>>>>>> try a newer snapshot.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia
>>>>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>>>>> Hey Jim,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>>>>>>>>>>>>> makes it group multiple objects together before passing them between Java
>>>>>>>>>>>>>> and Python, but this may be too high by default. Try passing batchSize=10 to
>>>>>>>>>>>>>> your SparkContext constructor to lower it (the default is 1024). Or even
>>>>>>>>>>>>>> batchSize=1 to match earlier versions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>>>>>>>>>>>>>> reduce the
>>>>>>>>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>>>>>>>>>>>>>> on big,
>>>>>>>>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>> too many of these records in memory, when all that is needed
>>>>>>>>>>>>>>> is to
>>>>>>>>>>>>>>> stream through them and count.  Any tips for getting through
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> workload?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>>>>>>>>>>>>>> (y,s):
>>>>>>>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> parsed.count()
>>>>>>>>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>>>>>>>>>>>>>> will FAIL
>>>>>>>>>>>>>>> all executors
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Incidentally the whole app appears to be killed, but this
>>>>>>>>>>>>>>> error is not
>>>>>>>>>>>>>>> propagated to the shell.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cluster:
>>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>>>>>>>>>>>>>> spark.executor.memory=10GB)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Exception:
>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>>>>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Re: pySpark memory usage

Posted by Matei Zaharia <ma...@gmail.com>.
Hey Jim, unfortunately external spilling is not implemented in Python right now. While it would be possible to update combineByKey to do smarter stuff here, one simple workaround you can try is to launch more map tasks (or more reduce tasks). To set the minimum number of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 1000)).

Matei

On May 12, 2014, at 5:47 PM, Jim Blomo <ji...@gmail.com> wrote:

> Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.
> 
> I noticed that the S3 exception seem to occur more frequently when the
> box is swapping.  Why is the box swapping?  combineByKey seems to make
> the assumption that it can fit an entire partition in memory when
> doing the combineLocally step.  I'm going to try to break this apart
> but will need some sort of heuristic options include looking at memory
> usage via the resource module and trying to keep below
> 'spark.executor.memory', or using batchSize to limit the number of
> entries in the dictionary.  Let me know if you have any opinions.
> 
> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <il...@gmail.com> wrote:
>> I'd just like to update this thread by pointing to the PR based on our
>> initial design: https://github.com/apache/spark/pull/640
>> 
>> This solution is a little more general and avoids catching IOException
>> altogether. Long live exception propagation!
>> 
>> 
>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>> 
>>> Hey Jim,
>>> 
>>> This IOException thing is a general issue that we need to fix and your
>>> observation is spot-in. There is actually a JIRA for it here I created a few
>>> days ago:
>>> https://issues.apache.org/jira/browse/SPARK-1579
>>> 
>>> Aaron is assigned on that one but not actively working on it, so we'd
>>> welcome a PR from you on this if you are interested.
>>> 
>>> The first thought we had was to set a volatile flag when the reader sees
>>> an exception (indicating there was a failure in the task) and avoid
>>> swallowing the IOException in the writer if this happens. But I think there
>>> is a race here where the writer sees the error first before the reader knows
>>> what is going on.
>>> 
>>> Anyways maybe if you have a simpler solution you could sketch it out in
>>> the JIRA and we could talk over there. The current proposal in the JIRA is
>>> somewhat complicated...
>>> 
>>> - Patrick
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>> 
>>>> FYI, it looks like this "stdin writer to Python finished early" error was
>>>> caused by a break in the connection to S3, from which the data was being
>>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>>> catching can potentially mask an exception for the data source, and that is
>>>> indeed what I see happening.  The underlying libraries (jets3t and
>>>> httpclient) do have retry capabilities, but I don't see a great way of
>>>> setting them through Spark code.  Instead I added the patch below which
>>>> kills the worker on the exception.  This allows me to completely load the
>>>> data source after a few worker retries.
>>>> 
>>>> Unfortunately, java.net.SocketException is the same error that is
>>>> sometimes expected from the client when using methods like take().  One
>>>> approach around this conflation is to create a new locally scoped exception
>>>> class, eg. WriterException, catch java.net.SocketException during output
>>>> writing, then re-throw the new exception.  The worker thread could then
>>>> distinguish between the reasons java.net.SocketException might be thrown.
>>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>> 
>>>> Let me know if I should open a ticket or discuss this on the developers
>>>> list instead.  Best,
>>>> 
>>>> Jim
>>>> 
>>>> diff --git
>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>> index 0d71fdb..f31158c 100644
>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>>             readerException = e
>>>>             Try(worker.shutdownOutput()) // kill Python worker process
>>>> 
>>>> +          case e: java.net.SocketException =>
>>>> +           // This can happen if a connection to the datasource, eg S3,
>>>> resets
>>>> +           // or is otherwise broken
>>>> +            readerException = e
>>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>>> +
>>>>           case e: IOException =>
>>>>             // This can happen for legitimate reasons if the Python code
>>>> stops returning data
>>>>             // before we are done passing elements through, e.g., for
>>>> take(). Just log a message to
>>>> 
>>>> 
>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>> 
>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>>> 343)
>>>>> 
>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
>>>>> wrote:
>>>>>> Okay, thanks. Do you have any info on how large your records and data
>>>>>> file are? I'd like to reproduce and fix this.
>>>>>> 
>>>>>> Matei
>>>>>> 
>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>> 
>>>>>>> Hi Matei, thanks for working with me to find these issues.
>>>>>>> 
>>>>>>> To summarize, the issues I've seen are:
>>>>>>> 0.9.0:
>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>> 
>>>>>>> SNAPSHOT 2014-03-18:
>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>>>>> Java heap space.  To me this indicates a memory leak since Spark
>>>>>>> should simply be counting records of size < 3MB
>>>>>>> - Without persist(), "stdin writer to Python finished early" hangs
>>>>>>> the
>>>>>>> application, unknown root cause
>>>>>>> 
>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>>>>>> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>>>>>> problem:
>>>>>>> 
>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>>>>>> early
>>>>>>> java.net.SocketException: Connection reset
>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>>>>>       at
>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>>>>>       at
>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>>>>>       at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>>>>>       at
>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>>>>>       at
>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>>>>>       at
>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>>>>>       at
>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>       at
>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>       at
>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>>>>>       at
>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>>>>>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>>>>>       at
>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>>>>>       at
>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>>>>>       at
>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>>>>>       at
>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>>>>>       at
>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>       at
>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>       at java.io.DataInputStream.read(DataInputStream.java:100)
>>>>>>>       at
>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>>>>>       at
>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>>>>>       at
>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>>>>>       at
>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>>>>>       at
>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>>>>>       at
>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>       at
>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>>>>>       at
>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>       at
>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>       at
>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>       at
>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>       at
>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>> Cool, thanks for the update. Have you tried running a branch with
>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>>>>>>> memory leak issue are you referring to, is it separate from this? (Couldn't
>>>>>>>> find it earlier in the thread.)
>>>>>>>> 
>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template to
>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present in
>>>>>>>> "conf" on all workers.
>>>>>>>> 
>>>>>>>> BTW I've managed to run PySpark with this fix on some reasonably
>>>>>>>> large S3 data (multiple GB) and it was fine. It might happen only if records
>>>>>>>> are large, or something like that. How much heap are you giving to your
>>>>>>>> executors, and does it show that much in the web UI?
>>>>>>>> 
>>>>>>>> Matei
>>>>>>>> 
>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> I think the problem I ran into in 0.9 is covered in
>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>> 
>>>>>>>>> When I kill the python process, the stacktrace I gets indicates
>>>>>>>>> that
>>>>>>>>> this happens at initialization.  It looks like the initial write to
>>>>>>>>> the Python process does not go through, and then the iterator hangs
>>>>>>>>> waiting for output.  I haven't had luck turning on debugging for
>>>>>>>>> the
>>>>>>>>> executor process.  Still trying to learn the lgo4j properties that
>>>>>>>>> need to be set.
>>>>>>>>> 
>>>>>>>>> No luck yet on tracking down the memory leak.
>>>>>>>>> 
>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>>>>>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>>>>>>>> (crashed)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>>>>>>      at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>>>      at
>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>>>>>>      at
>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>>>>>      at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>     at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>      at java.lang.Thread.run(Thread.java:724)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>>>>>>>>> Python
>>>>>>>>>> finished early` so frequently I wasn't able to load even a 1GB
>>>>>>>>>> file.
>>>>>>>>>> Let me know if I can provide any other info!
>>>>>>>>>> 
>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>>>>>>>>>> 0.8)? We'll try to look into these, seems like a serious error.
>>>>>>>>>>> 
>>>>>>>>>>> Matei
>>>>>>>>>>> 
>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>>>>>>>>>>> Hadoop
>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>>>>>> 
>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>>>>>>>>>>> none
>>>>>>>>>>>> have succeeded.
>>>>>>>>>>>> 
>>>>>>>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>>>>>>>>>>> batchSize=1.  5
>>>>>>>>>>>> of the 175 executors hung, and I had to kill the python process
>>>>>>>>>>>> to get
>>>>>>>>>>>> things going again.  The only indication of this in the logs was
>>>>>>>>>>>> `INFO
>>>>>>>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>>>>>>>> 
>>>>>>>>>>>> With batchSize=1 and persist, a new memory error came up in
>>>>>>>>>>>> several
>>>>>>>>>>>> tasks, before the app was failed:
>>>>>>>>>>>> 
>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>     at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>     at
>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>>>>>>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>>>>>>     at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>     at
>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>>     at
>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>>>>>>     at
>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>     at
>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>>>>     at
>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>> 
>>>>>>>>>>>> There are other exceptions, but I think they all stem from the
>>>>>>>>>>>> above,
>>>>>>>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>>>>>>>> BlockManagerMaster
>>>>>>>>>>>> 
>>>>>>>>>>>> Let me know if there are other settings I should try, or if I
>>>>>>>>>>>> should
>>>>>>>>>>>> try a newer snapshot.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia
>>>>>>>>>>>> <ma...@gmail.com> wrote:
>>>>>>>>>>>>> Hey Jim,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>>>>>>>>>>>> makes it group multiple objects together before passing them between Java
>>>>>>>>>>>>> and Python, but this may be too high by default. Try passing batchSize=10 to
>>>>>>>>>>>>> your SparkContext constructor to lower it (the default is 1024). Or even
>>>>>>>>>>>>> batchSize=1 to match earlier versions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Matei
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>>>>>>>>>>>>> reduce the
>>>>>>>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>>>>>>>>>>>>> am
>>>>>>>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>>>>>>>>>>>>> on big,
>>>>>>>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>> too many of these records in memory, when all that is needed
>>>>>>>>>>>>>> is to
>>>>>>>>>>>>>> stream through them and count.  Any tips for getting through
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>> workload?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>>>>>>>>>>>>> (y,s):
>>>>>>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> parsed.count()
>>>>>>>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>>>>>>>>>>>>> will FAIL
>>>>>>>>>>>>>> all executors
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Incidentally the whole app appears to be killed, but this
>>>>>>>>>>>>>> error is not
>>>>>>>>>>>>>> propagated to the shell.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Cluster:
>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>>>>>>>>>>>>> spark.executor.memory=10GB)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Exception:
>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>>>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>> 
>> 


Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.

I noticed that the S3 exception seem to occur more frequently when the
box is swapping.  Why is the box swapping?  combineByKey seems to make
the assumption that it can fit an entire partition in memory when
doing the combineLocally step.  I'm going to try to break this apart
but will need some sort of heuristic options include looking at memory
usage via the resource module and trying to keep below
'spark.executor.memory', or using batchSize to limit the number of
entries in the dictionary.  Let me know if you have any opinions.

On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <il...@gmail.com> wrote:
> I'd just like to update this thread by pointing to the PR based on our
> initial design: https://github.com/apache/spark/pull/640
>
> This solution is a little more general and avoids catching IOException
> altogether. Long live exception propagation!
>
>
> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>
>> Hey Jim,
>>
>> This IOException thing is a general issue that we need to fix and your
>> observation is spot-in. There is actually a JIRA for it here I created a few
>> days ago:
>> https://issues.apache.org/jira/browse/SPARK-1579
>>
>> Aaron is assigned on that one but not actively working on it, so we'd
>> welcome a PR from you on this if you are interested.
>>
>> The first thought we had was to set a volatile flag when the reader sees
>> an exception (indicating there was a failure in the task) and avoid
>> swallowing the IOException in the writer if this happens. But I think there
>> is a race here where the writer sees the error first before the reader knows
>> what is going on.
>>
>> Anyways maybe if you have a simpler solution you could sketch it out in
>> the JIRA and we could talk over there. The current proposal in the JIRA is
>> somewhat complicated...
>>
>> - Patrick
>>
>>
>>
>>
>>
>>
>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>
>>> FYI, it looks like this "stdin writer to Python finished early" error was
>>> caused by a break in the connection to S3, from which the data was being
>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>> catching can potentially mask an exception for the data source, and that is
>>> indeed what I see happening.  The underlying libraries (jets3t and
>>> httpclient) do have retry capabilities, but I don't see a great way of
>>> setting them through Spark code.  Instead I added the patch below which
>>> kills the worker on the exception.  This allows me to completely load the
>>> data source after a few worker retries.
>>>
>>> Unfortunately, java.net.SocketException is the same error that is
>>> sometimes expected from the client when using methods like take().  One
>>> approach around this conflation is to create a new locally scoped exception
>>> class, eg. WriterException, catch java.net.SocketException during output
>>> writing, then re-throw the new exception.  The worker thread could then
>>> distinguish between the reasons java.net.SocketException might be thrown.
>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>
>>> Let me know if I should open a ticket or discuss this on the developers
>>> list instead.  Best,
>>>
>>> Jim
>>>
>>> diff --git
>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> index 0d71fdb..f31158c 100644
>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>              readerException = e
>>>              Try(worker.shutdownOutput()) // kill Python worker process
>>>
>>> +          case e: java.net.SocketException =>
>>> +           // This can happen if a connection to the datasource, eg S3,
>>> resets
>>> +           // or is otherwise broken
>>> +            readerException = e
>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>> +
>>>            case e: IOException =>
>>>              // This can happen for legitimate reasons if the Python code
>>> stops returning data
>>>              // before we are done passing elements through, e.g., for
>>> take(). Just log a message to
>>>
>>>
>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>
>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>> 343)
>>>>
>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
>>>> wrote:
>>>> > Okay, thanks. Do you have any info on how large your records and data
>>>> > file are? I'd like to reproduce and fix this.
>>>> >
>>>> > Matei
>>>> >
>>>> > On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>> >
>>>> >> Hi Matei, thanks for working with me to find these issues.
>>>> >>
>>>> >> To summarize, the issues I've seen are:
>>>> >> 0.9.0:
>>>> >> - https://issues.apache.org/jira/browse/SPARK-1323
>>>> >>
>>>> >> SNAPSHOT 2014-03-18:
>>>> >> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>> >> Java heap space.  To me this indicates a memory leak since Spark
>>>> >> should simply be counting records of size < 3MB
>>>> >> - Without persist(), "stdin writer to Python finished early" hangs
>>>> >> the
>>>> >> application, unknown root cause
>>>> >>
>>>> >> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>>> >> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>>> >> problem:
>>>> >>
>>>> >> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>>> >> early
>>>> >> java.net.SocketException: Connection reset
>>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>> >>        at
>>>> >> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>> >>        at
>>>> >> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>> >>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>> >>        at
>>>> >> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>> >>        at
>>>> >> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>> >>        at
>>>> >> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>> >>        at
>>>> >> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>> >>        at
>>>> >> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>> >>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>> >>        at
>>>> >> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>> >>        at
>>>> >> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>> >>        at
>>>> >> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>> >>        at
>>>> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>> >>        at
>>>> >> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>> >>        at java.io.DataInputStream.read(DataInputStream.java:100)
>>>> >>        at
>>>> >> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>> >>        at
>>>> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>> >>        at
>>>> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>> >>        at
>>>> >> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>> >>        at
>>>> >> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>> >>        at
>>>> >> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>> >>        at
>>>> >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>> >>        at
>>>> >> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> >>        at
>>>> >> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>> >>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> >>        at
>>>> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>> >>        at
>>>> >> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>> >>        at
>>>> >> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> >>
>>>> >>
>>>> >> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>> >> <ma...@gmail.com> wrote:
>>>> >>> Cool, thanks for the update. Have you tried running a branch with
>>>> >>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>>> >>> memory leak issue are you referring to, is it separate from this? (Couldn't
>>>> >>> find it earlier in the thread.)
>>>> >>>
>>>> >>> To turn on debug logging, copy conf/log4j.properties.template to
>>>> >>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>>>> >>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present in
>>>> >>> "conf" on all workers.
>>>> >>>
>>>> >>> BTW I've managed to run PySpark with this fix on some reasonably
>>>> >>> large S3 data (multiple GB) and it was fine. It might happen only if records
>>>> >>> are large, or something like that. How much heap are you giving to your
>>>> >>> executors, and does it show that much in the web UI?
>>>> >>>
>>>> >>> Matei
>>>> >>>
>>>> >>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>> >>>
>>>> >>>> I think the problem I ran into in 0.9 is covered in
>>>> >>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>> >>>>
>>>> >>>> When I kill the python process, the stacktrace I gets indicates
>>>> >>>> that
>>>> >>>> this happens at initialization.  It looks like the initial write to
>>>> >>>> the Python process does not go through, and then the iterator hangs
>>>> >>>> waiting for output.  I haven't had luck turning on debugging for
>>>> >>>> the
>>>> >>>> executor process.  Still trying to learn the lgo4j properties that
>>>> >>>> need to be set.
>>>> >>>>
>>>> >>>> No luck yet on tracking down the memory leak.
>>>> >>>>
>>>> >>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>> >>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>>> >>>> (crashed)
>>>> >>>>       at
>>>> >>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>> >>>>       at
>>>> >>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>> >>>>       at
>>>> >>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>> >>>>       at
>>>> >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>> >>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>> >>>>       at
>>>> >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>> >>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>> >>>>       at
>>>> >>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>> >>>>       at
>>>> >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>> >>>>       at
>>>> >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>> >>>>       at java.security.AccessController.doPrivileged(Native Method)
>>>> >>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>>>> >>>>       at
>>>> >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>> >>>>       at
>>>> >>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>> >>>>       at
>>>> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>> >>>>       at
>>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> >>>>      at
>>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> >>>>       at java.lang.Thread.run(Thread.java:724)
>>>> >>>>
>>>> >>>>
>>>> >>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
>>>> >>>> wrote:
>>>> >>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>>> >>>>> Python
>>>> >>>>> finished early` so frequently I wasn't able to load even a 1GB
>>>> >>>>> file.
>>>> >>>>> Let me know if I can provide any other info!
>>>> >>>>>
>>>> >>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>> >>>>> <ma...@gmail.com> wrote:
>>>> >>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>>> >>>>>> 0.8)? We'll try to look into these, seems like a serious error.
>>>> >>>>>>
>>>> >>>>>> Matei
>>>> >>>>>>
>>>> >>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com>
>>>> >>>>>> wrote:
>>>> >>>>>>
>>>> >>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>>> >>>>>>> Hadoop
>>>> >>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>> >>>>>>>
>>>> >>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>>> >>>>>>> none
>>>> >>>>>>> have succeeded.
>>>> >>>>>>>
>>>> >>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>> >>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>>> >>>>>>> batchSize=1.  5
>>>> >>>>>>> of the 175 executors hung, and I had to kill the python process
>>>> >>>>>>> to get
>>>> >>>>>>> things going again.  The only indication of this in the logs was
>>>> >>>>>>> `INFO
>>>> >>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>> >>>>>>>
>>>> >>>>>>> With batchSize=1 and persist, a new memory error came up in
>>>> >>>>>>> several
>>>> >>>>>>> tasks, before the app was failed:
>>>> >>>>>>>
>>>> >>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>> >>>>>>> thread Thread[stdin writer for python,5,main]
>>>> >>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>> >>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>> >>>>>>>      at java.lang.String.<init>(String.java:203)
>>>> >>>>>>>      at
>>>> >>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>> >>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>> >>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> >>>>>>>      at
>>>> >>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>> >>>>>>>      at
>>>> >>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> >>>>>>>
>>>> >>>>>>> There are other exceptions, but I think they all stem from the
>>>> >>>>>>> above,
>>>> >>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>> >>>>>>> BlockManagerMaster
>>>> >>>>>>>
>>>> >>>>>>> Let me know if there are other settings I should try, or if I
>>>> >>>>>>> should
>>>> >>>>>>> try a newer snapshot.
>>>> >>>>>>>
>>>> >>>>>>> Thanks again!
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia
>>>> >>>>>>> <ma...@gmail.com> wrote:
>>>> >>>>>>>> Hey Jim,
>>>> >>>>>>>>
>>>> >>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>>> >>>>>>>> makes it group multiple objects together before passing them between Java
>>>> >>>>>>>> and Python, but this may be too high by default. Try passing batchSize=10 to
>>>> >>>>>>>> your SparkContext constructor to lower it (the default is 1024). Or even
>>>> >>>>>>>> batchSize=1 to match earlier versions.
>>>> >>>>>>>>
>>>> >>>>>>>> Matei
>>>> >>>>>>>>
>>>> >>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
>>>> >>>>>>>> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>>> >>>>>>>>> reduce the
>>>> >>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>>> >>>>>>>>> am
>>>> >>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>>> >>>>>>>>> on big,
>>>> >>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>>> >>>>>>>>> keep
>>>> >>>>>>>>> too many of these records in memory, when all that is needed
>>>> >>>>>>>>> is to
>>>> >>>>>>>>> stream through them and count.  Any tips for getting through
>>>> >>>>>>>>> this
>>>> >>>>>>>>> workload?
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Code:
>>>> >>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>>> >>>>>>>>> data
>>>> >>>>>>>>>
>>>> >>>>>>>>> # the biggest individual text line is ~3MB
>>>> >>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>>> >>>>>>>>> (y,s):
>>>> >>>>>>>>> (loads(y), loads(s)))
>>>> >>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>> >>>>>>>>>
>>>> >>>>>>>>> parsed.count()
>>>> >>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>>> >>>>>>>>> will FAIL
>>>> >>>>>>>>> all executors
>>>> >>>>>>>>>
>>>> >>>>>>>>> Incidentally the whole app appears to be killed, but this
>>>> >>>>>>>>> error is not
>>>> >>>>>>>>> propagated to the shell.
>>>> >>>>>>>>>
>>>> >>>>>>>>> Cluster:
>>>> >>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>>> >>>>>>>>> spark.executor.memory=10GB)
>>>> >>>>>>>>>
>>>> >>>>>>>>> Exception:
>>>> >>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>> >>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>> >>>>>>>>>     at
>>>> >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> >>>>>>>>
>>>> >>>>>>
>>>> >>>
>>>> >
>>>
>>>
>>
>

Re: pySpark memory usage

Posted by Aaron Davidson <il...@gmail.com>.
I'd just like to update this thread by pointing to the PR based on our
initial design: https://github.com/apache/spark/pull/640

This solution is a little more general and avoids catching IOException
altogether. Long live exception propagation!


On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pw...@gmail.com> wrote:

> Hey Jim,
>
> This IOException thing is a general issue that we need to fix and your
> observation is spot-in. There is actually a JIRA for it here I created a
> few days ago:
> https://issues.apache.org/jira/browse/SPARK-1579
>
> Aaron is assigned on that one but not actively working on it, so we'd
> welcome a PR from you on this if you are interested.
>
> The first thought we had was to set a volatile flag when the reader sees
> an exception (indicating there was a failure in the task) and avoid
> swallowing the IOException in the writer if this happens. But I think there
> is a race here where the writer sees the error first before the reader
> knows what is going on.
>
> Anyways maybe if you have a simpler solution you could sketch it out in
> the JIRA and we could talk over there. The current proposal in the JIRA is
> somewhat complicated...
>
> - Patrick
>
>
>
>
>
>
> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <ji...@gmail.com> wrote:
>
>> FYI, it looks like this "stdin writer to Python finished early" error was
>> caused by a break in the connection to S3, from which the data was being
>> pulled.  A recent commit to PythonRDD<https://github.com/apache/spark/commit/a967b005c8937a3053e215c952d2172ee3dc300d#commitcomment-6114780>noted that the current exception catching can potentially mask an exception
>> for the data source, and that is indeed what I see happening.  The
>> underlying libraries (jets3t and httpclient) do have retry capabilities,
>> but I don't see a great way of setting them through Spark code.  Instead I
>> added the patch below which kills the worker on the exception.  This allows
>> me to completely load the data source after a few worker retries.
>>
>> Unfortunately, java.net.SocketException is the same error that is
>> sometimes expected from the client when using methods like take().  One
>> approach around this conflation is to create a new locally scoped exception
>> class, eg. WriterException, catch java.net.SocketException during output
>> writing, then re-throw the new exception.  The worker thread could then
>> distinguish between the reasons java.net.SocketException might be thrown.
>>  Perhaps there is a more elegant way to do this in Scala, though?
>>
>> Let me know if I should open a ticket or discuss this on the developers
>> list instead.  Best,
>>
>> Jim
>>
>> diff --git
>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> index 0d71fdb..f31158c 100644
>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>              readerException = e
>>              Try(worker.shutdownOutput()) // kill Python worker process
>>
>> +          case e: java.net.SocketException =>
>> +           // This can happen if a connection to the datasource, eg S3,
>> resets
>> +           // or is otherwise broken
>> +            readerException = e
>> +            Try(worker.shutdownOutput()) // kill Python worker process
>> +
>>            case e: IOException =>
>>              // This can happen for legitimate reasons if the Python code
>> stops returning data
>>              // before we are done passing elements through, e.g., for
>> take(). Just log a message to
>>
>>
>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:
>>
>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>> 343)
>>>
>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
>>> wrote:
>>> > Okay, thanks. Do you have any info on how large your records and data
>>> file are? I'd like to reproduce and fix this.
>>> >
>>> > Matei
>>> >
>>> > On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>>> >
>>> >> Hi Matei, thanks for working with me to find these issues.
>>> >>
>>> >> To summarize, the issues I've seen are:
>>> >> 0.9.0:
>>> >> - https://issues.apache.org/jira/browse/SPARK-1323
>>> >>
>>> >> SNAPSHOT 2014-03-18:
>>> >> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>> >> Java heap space.  To me this indicates a memory leak since Spark
>>> >> should simply be counting records of size < 3MB
>>> >> - Without persist(), "stdin writer to Python finished early" hangs the
>>> >> application, unknown root cause
>>> >>
>>> >> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>>> >> debugging turned on.  This gives me the stacktrace on the new "stdin"
>>> >> problem:
>>> >>
>>> >> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>>> early
>>> >> java.net.SocketException: Connection reset
>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>> >>        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>> >>        at
>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>> >>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>> >>        at
>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>> >>        at
>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>> >>        at
>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>> >>        at
>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>> >>        at
>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>> >>        at
>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>> >>        at
>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>> >>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>> >>        at
>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>> >>        at
>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>> >>        at
>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>> >>        at
>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>> >>        at
>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>> >>        at
>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>> >>        at java.io.DataInputStream.read(DataInputStream.java:100)
>>> >>        at
>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>> >>        at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>> >>        at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>> >>        at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>> >>        at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>> >>        at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> >>        at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>> >>        at
>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> >>        at
>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>> >>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>        at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>        at
>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>> >>        at
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>> >>
>>> >>
>>> >> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <
>>> matei.zaharia@gmail.com> wrote:
>>> >>> Cool, thanks for the update. Have you tried running a branch with
>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>>> memory leak issue are you referring to, is it separate from this? (Couldn't
>>> find it earlier in the thread.)
>>> >>>
>>> >>> To turn on debug logging, copy conf/log4j.properties.template to
>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present
>>> in "conf" on all workers.
>>> >>>
>>> >>> BTW I've managed to run PySpark with this fix on some reasonably
>>> large S3 data (multiple GB) and it was fine. It might happen only if
>>> records are large, or something like that. How much heap are you giving to
>>> your executors, and does it show that much in the web UI?
>>> >>>
>>> >>> Matei
>>> >>>
>>> >>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>>> >>>
>>> >>>> I think the problem I ran into in 0.9 is covered in
>>> >>>> https://issues.apache.org/jira/browse/SPARK-1323
>>> >>>>
>>> >>>> When I kill the python process, the stacktrace I gets indicates that
>>> >>>> this happens at initialization.  It looks like the initial write to
>>> >>>> the Python process does not go through, and then the iterator hangs
>>> >>>> waiting for output.  I haven't had luck turning on debugging for the
>>> >>>> executor process.  Still trying to learn the lgo4j properties that
>>> >>>> need to be set.
>>> >>>>
>>> >>>> No luck yet on tracking down the memory leak.
>>> >>>>
>>> >>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>> >>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>>> (crashed)
>>> >>>>       at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>> >>>>       at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>> >>>>       at
>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>> >>>>       at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>> >>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>> >>>>       at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>> >>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>> >>>>       at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>> >>>>       at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>> >>>>       at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>> >>>>       at java.security.AccessController.doPrivileged(Native Method)
>>> >>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>>> >>>>       at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>> >>>>       at
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>> >>>>       at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>> >>>>       at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> >>>>      at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> >>>>       at java.lang.Thread.run(Thread.java:724)
>>> >>>>
>>> >>>>
>>> >>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
>>> wrote:
>>> >>>>> I've only tried 0.9, in which I ran into the `stdin writer to
>>> Python
>>> >>>>> finished early` so frequently I wasn't able to load even a 1GB
>>> file.
>>> >>>>> Let me know if I can provide any other info!
>>> >>>>>
>>> >>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <
>>> matei.zaharia@gmail.com> wrote:
>>> >>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>>> 0.8)? We'll try to look into these, seems like a serious error.
>>> >>>>>>
>>> >>>>>> Matei
>>> >>>>>>
>>> >>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com>
>>> wrote:
>>> >>>>>>
>>> >>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>>> Hadoop
>>> >>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>> >>>>>>>
>>> >>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>>> none
>>> >>>>>>> have succeeded.
>>> >>>>>>>
>>> >>>>>>> I can get this to work -- with manual interventions -- if I omit
>>> >>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>>> batchSize=1.  5
>>> >>>>>>> of the 175 executors hung, and I had to kill the python process
>>> to get
>>> >>>>>>> things going again.  The only indication of this in the logs was
>>> `INFO
>>> >>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>> >>>>>>>
>>> >>>>>>> With batchSize=1 and persist, a new memory error came up in
>>> several
>>> >>>>>>> tasks, before the app was failed:
>>> >>>>>>>
>>> >>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>> >>>>>>> thread Thread[stdin writer for python,5,main]
>>> >>>>>>> java.lang.OutOfMemoryError: Java heap space
>>> >>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>> >>>>>>>      at java.lang.String.<init>(String.java:203)
>>> >>>>>>>      at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>> >>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>>> >>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>>> >>>>>>>      at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>> >>>>>>>      at
>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>> >>>>>>>      at
>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> >>>>>>>      at
>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>> >>>>>>>      at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>>>>>>      at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>>>>>>      at
>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>> >>>>>>>      at
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>> >>>>>>>
>>> >>>>>>> There are other exceptions, but I think they all stem from the
>>> above,
>>> >>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>> >>>>>>> BlockManagerMaster
>>> >>>>>>>
>>> >>>>>>> Let me know if there are other settings I should try, or if I
>>> should
>>> >>>>>>> try a newer snapshot.
>>> >>>>>>>
>>> >>>>>>> Thanks again!
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <
>>> matei.zaharia@gmail.com> wrote:
>>> >>>>>>>> Hey Jim,
>>> >>>>>>>>
>>> >>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>>> makes it group multiple objects together before passing them between Java
>>> and Python, but this may be too high by default. Try passing batchSize=10
>>> to your SparkContext constructor to lower it (the default is 1024). Or even
>>> batchSize=1 to match earlier versions.
>>> >>>>>>>>
>>> >>>>>>>> Matei
>>> >>>>>>>>
>>> >>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
>>> wrote:
>>> >>>>>>>>
>>> >>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>>> reduce the
>>> >>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>>> am
>>> >>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>>> on big,
>>> >>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>>> keep
>>> >>>>>>>>> too many of these records in memory, when all that is needed
>>> is to
>>> >>>>>>>>> stream through them and count.  Any tips for getting through
>>> this
>>> >>>>>>>>> workload?
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> Code:
>>> >>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>>> data
>>> >>>>>>>>>
>>> >>>>>>>>> # the biggest individual text line is ~3MB
>>> >>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>>> (y,s):
>>> >>>>>>>>> (loads(y), loads(s)))
>>> >>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>> >>>>>>>>>
>>> >>>>>>>>> parsed.count()
>>> >>>>>>>>> # will never finish: executor.Executor: Uncaught exception
>>> will FAIL
>>> >>>>>>>>> all executors
>>> >>>>>>>>>
>>> >>>>>>>>> Incidentally the whole app appears to be killed, but this
>>> error is not
>>> >>>>>>>>> propagated to the shell.
>>> >>>>>>>>>
>>> >>>>>>>>> Cluster:
>>> >>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>>> spark.executor.memory=10GB)
>>> >>>>>>>>>
>>> >>>>>>>>> Exception:
>>> >>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>> >>>>>>>>>     at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>> >>>>>>>>>     at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>> >>>>>>>>>     at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>> >>>>>>>>>     at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>> >>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>> >>>>>>>>>     at
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>> >>>>>>>>
>>> >>>>>>
>>> >>>
>>> >
>>>
>>
>>
>

Re: pySpark memory usage

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Jim,

This IOException thing is a general issue that we need to fix and your
observation is spot-in. There is actually a JIRA for it here I created a
few days ago:
https://issues.apache.org/jira/browse/SPARK-1579

Aaron is assigned on that one but not actively working on it, so we'd
welcome a PR from you on this if you are interested.

The first thought we had was to set a volatile flag when the reader sees an
exception (indicating there was a failure in the task) and avoid swallowing
the IOException in the writer if this happens. But I think there is a race
here where the writer sees the error first before the reader knows what is
going on.

Anyways maybe if you have a simpler solution you could sketch it out in the
JIRA and we could talk over there. The current proposal in the JIRA is
somewhat complicated...

- Patrick






On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <ji...@gmail.com> wrote:

> FYI, it looks like this "stdin writer to Python finished early" error was
> caused by a break in the connection to S3, from which the data was being
> pulled.  A recent commit to PythonRDD<https://github.com/apache/spark/commit/a967b005c8937a3053e215c952d2172ee3dc300d#commitcomment-6114780>noted that the current exception catching can potentially mask an exception
> for the data source, and that is indeed what I see happening.  The
> underlying libraries (jets3t and httpclient) do have retry capabilities,
> but I don't see a great way of setting them through Spark code.  Instead I
> added the patch below which kills the worker on the exception.  This allows
> me to completely load the data source after a few worker retries.
>
> Unfortunately, java.net.SocketException is the same error that is
> sometimes expected from the client when using methods like take().  One
> approach around this conflation is to create a new locally scoped exception
> class, eg. WriterException, catch java.net.SocketException during output
> writing, then re-throw the new exception.  The worker thread could then
> distinguish between the reasons java.net.SocketException might be thrown.
>  Perhaps there is a more elegant way to do this in Scala, though?
>
> Let me know if I should open a ticket or discuss this on the developers
> list instead.  Best,
>
> Jim
>
> diff --git
> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
> index 0d71fdb..f31158c 100644
> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>              readerException = e
>              Try(worker.shutdownOutput()) // kill Python worker process
>
> +          case e: java.net.SocketException =>
> +           // This can happen if a connection to the datasource, eg S3,
> resets
> +           // or is otherwise broken
> +            readerException = e
> +            Try(worker.shutdownOutput()) // kill Python worker process
> +
>            case e: IOException =>
>              // This can happen for legitimate reasons if the Python code
> stops returning data
>              // before we are done passing elements through, e.g., for
> take(). Just log a message to
>
>
> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:
>
>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>> 343)
>>
>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
>> wrote:
>> > Okay, thanks. Do you have any info on how large your records and data
>> file are? I'd like to reproduce and fix this.
>> >
>> > Matei
>> >
>> > On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>> >
>> >> Hi Matei, thanks for working with me to find these issues.
>> >>
>> >> To summarize, the issues I've seen are:
>> >> 0.9.0:
>> >> - https://issues.apache.org/jira/browse/SPARK-1323
>> >>
>> >> SNAPSHOT 2014-03-18:
>> >> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>> >> Java heap space.  To me this indicates a memory leak since Spark
>> >> should simply be counting records of size < 3MB
>> >> - Without persist(), "stdin writer to Python finished early" hangs the
>> >> application, unknown root cause
>> >>
>> >> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>> >> debugging turned on.  This gives me the stacktrace on the new "stdin"
>> >> problem:
>> >>
>> >> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished
>> early
>> >> java.net.SocketException: Connection reset
>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>> >>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> >>        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> >>        at
>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>> >>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>> >>        at
>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> >>        at
>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>> >>        at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> >>        at
>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>> >>        at
>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>> >>        at
>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>> >>        at
>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>> >>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>> >>        at
>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>> >>        at
>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>> >>        at
>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>> >>        at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>> >>        at
>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>> >>        at
>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>> >>        at java.io.DataInputStream.read(DataInputStream.java:100)
>> >>        at
>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>> >>        at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>> >>        at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>> >>        at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>> >>        at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>> >>        at
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >>        at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>> >>        at
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >>        at
>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>> >>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>        at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>        at
>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>> >>        at
>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>> >>
>> >>
>> >> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <ma...@gmail.com>
>> wrote:
>> >>> Cool, thanks for the update. Have you tried running a branch with
>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what
>> memory leak issue are you referring to, is it separate from this? (Couldn't
>> find it earlier in the thread.)
>> >>>
>> >>> To turn on debug logging, copy conf/log4j.properties.template to
>> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
>> to log4j.rootCategory=DEBUG, console. Then make sure this file is present
>> in "conf" on all workers.
>> >>>
>> >>> BTW I've managed to run PySpark with this fix on some reasonably
>> large S3 data (multiple GB) and it was fine. It might happen only if
>> records are large, or something like that. How much heap are you giving to
>> your executors, and does it show that much in the web UI?
>> >>>
>> >>> Matei
>> >>>
>> >>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>> >>>
>> >>>> I think the problem I ran into in 0.9 is covered in
>> >>>> https://issues.apache.org/jira/browse/SPARK-1323
>> >>>>
>> >>>> When I kill the python process, the stacktrace I gets indicates that
>> >>>> this happens at initialization.  It looks like the initial write to
>> >>>> the Python process does not go through, and then the iterator hangs
>> >>>> waiting for output.  I haven't had luck turning on debugging for the
>> >>>> executor process.  Still trying to learn the lgo4j properties that
>> >>>> need to be set.
>> >>>>
>> >>>> No luck yet on tracking down the memory leak.
>> >>>>
>> >>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>> >>>> org.apache.spark.SparkException: Python worker exited unexpectedly
>> (crashed)
>> >>>>       at
>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>> >>>>       at
>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>> >>>>       at
>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>> >>>>       at
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>> >>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>> >>>>       at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>> >>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>> >>>>       at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>> >>>>       at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>> >>>>       at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>> >>>>       at java.security.AccessController.doPrivileged(Native Method)
>> >>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>> >>>>       at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>> >>>>       at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>> >>>>       at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>> >>>>       at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>>>      at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>>>       at java.lang.Thread.run(Thread.java:724)
>> >>>>
>> >>>>
>> >>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
>> wrote:
>> >>>>> I've only tried 0.9, in which I ran into the `stdin writer to Python
>> >>>>> finished early` so frequently I wasn't able to load even a 1GB file.
>> >>>>> Let me know if I can provide any other info!
>> >>>>>
>> >>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <
>> matei.zaharia@gmail.com> wrote:
>> >>>>>> I see, did this also fail with previous versions of Spark (0.9 or
>> 0.8)? We'll try to look into these, seems like a serious error.
>> >>>>>>
>> >>>>>> Matei
>> >>>>>>
>> >>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com>
>> wrote:
>> >>>>>>
>> >>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for
>> Hadoop
>> >>>>>>> 1.0.4" from GitHub on 2014-03-18.
>> >>>>>>>
>> >>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
>> none
>> >>>>>>> have succeeded.
>> >>>>>>>
>> >>>>>>> I can get this to work -- with manual interventions -- if I omit
>> >>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
>> batchSize=1.  5
>> >>>>>>> of the 175 executors hung, and I had to kill the python process
>> to get
>> >>>>>>> things going again.  The only indication of this in the logs was
>> `INFO
>> >>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>> >>>>>>>
>> >>>>>>> With batchSize=1 and persist, a new memory error came up in
>> several
>> >>>>>>> tasks, before the app was failed:
>> >>>>>>>
>> >>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>> >>>>>>> thread Thread[stdin writer for python,5,main]
>> >>>>>>> java.lang.OutOfMemoryError: Java heap space
>> >>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>> >>>>>>>      at java.lang.String.<init>(String.java:203)
>> >>>>>>>      at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>> >>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>> >>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>> >>>>>>>      at
>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>> >>>>>>>      at
>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>> >>>>>>>      at
>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> >>>>>>>      at
>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>> >>>>>>>      at
>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>>>>>>      at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>>>>>>      at
>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>> >>>>>>>      at
>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>> >>>>>>>
>> >>>>>>> There are other exceptions, but I think they all stem from the
>> above,
>> >>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>> >>>>>>> BlockManagerMaster
>> >>>>>>>
>> >>>>>>> Let me know if there are other settings I should try, or if I
>> should
>> >>>>>>> try a newer snapshot.
>> >>>>>>>
>> >>>>>>> Thanks again!
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <
>> matei.zaharia@gmail.com> wrote:
>> >>>>>>>> Hey Jim,
>> >>>>>>>>
>> >>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
>> makes it group multiple objects together before passing them between Java
>> and Python, but this may be too high by default. Try passing batchSize=10
>> to your SparkContext constructor to lower it (the default is 1024). Or even
>> batchSize=1 to match earlier versions.
>> >>>>>>>>
>> >>>>>>>> Matei
>> >>>>>>>>
>> >>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
>> reduce the
>> >>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I
>> am
>> >>>>>>>>> getting OutOfMemoryError exceptions while calculating count()
>> on big,
>> >>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
>> keep
>> >>>>>>>>> too many of these records in memory, when all that is needed is
>> to
>> >>>>>>>>> stream through them and count.  Any tips for getting through
>> this
>> >>>>>>>>> workload?
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Code:
>> >>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
>> data
>> >>>>>>>>>
>> >>>>>>>>> # the biggest individual text line is ~3MB
>> >>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda
>> (y,s):
>> >>>>>>>>> (loads(y), loads(s)))
>> >>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>> >>>>>>>>>
>> >>>>>>>>> parsed.count()
>> >>>>>>>>> # will never finish: executor.Executor: Uncaught exception will
>> FAIL
>> >>>>>>>>> all executors
>> >>>>>>>>>
>> >>>>>>>>> Incidentally the whole app appears to be killed, but this error
>> is not
>> >>>>>>>>> propagated to the shell.
>> >>>>>>>>>
>> >>>>>>>>> Cluster:
>> >>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
>> spark.executor.memory=10GB)
>> >>>>>>>>>
>> >>>>>>>>> Exception:
>> >>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>> >>>>>>>>>     at
>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>> >>>>>>>>>     at
>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>> >>>>>>>>>     at
>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>> >>>>>>>>>     at
>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>>>>>>>>     at
>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>> >>>>>>>>>     at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> >>>>>>>>>     at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> >>>>>>>>>     at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>> >>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>> >>>>>>>>>     at
>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>> >>>>>>>>
>> >>>>>>
>> >>>
>> >
>>
>
>

Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
FYI, it looks like this "stdin writer to Python finished early" error was
caused by a break in the connection to S3, from which the data was being
pulled.  A recent commit to
PythonRDD<https://github.com/apache/spark/commit/a967b005c8937a3053e215c952d2172ee3dc300d#commitcomment-6114780>noted
that the current exception catching can potentially mask an exception
for the data source, and that is indeed what I see happening.  The
underlying libraries (jets3t and httpclient) do have retry capabilities,
but I don't see a great way of setting them through Spark code.  Instead I
added the patch below which kills the worker on the exception.  This allows
me to completely load the data source after a few worker retries.

Unfortunately, java.net.SocketException is the same error that is sometimes
expected from the client when using methods like take().  One approach
around this conflation is to create a new locally scoped exception class,
eg. WriterException, catch java.net.SocketException during output writing,
then re-throw the new exception.  The worker thread could then distinguish
between the reasons java.net.SocketException might be thrown.  Perhaps
there is a more elegant way to do this in Scala, though?

Let me know if I should open a ticket or discuss this on the developers
list instead.  Best,

Jim

diff --git
a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 0d71fdb..f31158c 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
             readerException = e
             Try(worker.shutdownOutput()) // kill Python worker process

+          case e: java.net.SocketException =>
+           // This can happen if a connection to the datasource, eg S3,
resets
+           // or is otherwise broken
+            readerException = e
+            Try(worker.shutdownOutput()) // kill Python worker process
+
           case e: IOException =>
             // This can happen for legitimate reasons if the Python code
stops returning data
             // before we are done passing elements through, e.g., for
take(). Just log a message to


On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <ji...@gmail.com> wrote:

> This dataset is uncompressed text at ~54GB. stats() returns (count:
> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
> 343)
>
> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com>
> wrote:
> > Okay, thanks. Do you have any info on how large your records and data
> file are? I'd like to reproduce and fix this.
> >
> > Matei
> >
> > On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
> >
> >> Hi Matei, thanks for working with me to find these issues.
> >>
> >> To summarize, the issues I've seen are:
> >> 0.9.0:
> >> - https://issues.apache.org/jira/browse/SPARK-1323
> >>
> >> SNAPSHOT 2014-03-18:
> >> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
> >> Java heap space.  To me this indicates a memory leak since Spark
> >> should simply be counting records of size < 3MB
> >> - Without persist(), "stdin writer to Python finished early" hangs the
> >> application, unknown root cause
> >>
> >> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
> >> debugging turned on.  This gives me the stacktrace on the new "stdin"
> >> problem:
> >>
> >> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
> >> java.net.SocketException: Connection reset
> >>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
> >>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
> >>        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> >>        at
> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
> >>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
> >>        at
> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> >>        at
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
> >>        at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> >>        at
> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
> >>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> >>        at
> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
> >>        at
> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
> >>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
> >>        at
> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
> >>        at
> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
> >>        at
> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
> >>        at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
> >>        at
> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
> >>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> >>        at java.io.DataInputStream.read(DataInputStream.java:100)
> >>        at
> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
> >>        at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
> >>        at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
> >>        at
> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
> >>        at
> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
> >>        at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>        at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
> >>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
> >>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>        at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
> >>        at
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> >>
> >>
> >> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <ma...@gmail.com>
> wrote:
> >>> Cool, thanks for the update. Have you tried running a branch with this
> fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory
> leak issue are you referring to, is it separate from this? (Couldn't find
> it earlier in the thread.)
> >>>
> >>> To turn on debug logging, copy conf/log4j.properties.template to
> conf/log4j.properties and change the line log4j.rootCategory=INFO, console
> to log4j.rootCategory=DEBUG, console. Then make sure this file is present
> in "conf" on all workers.
> >>>
> >>> BTW I've managed to run PySpark with this fix on some reasonably large
> S3 data (multiple GB) and it was fine. It might happen only if records are
> large, or something like that. How much heap are you giving to your
> executors, and does it show that much in the web UI?
> >>>
> >>> Matei
> >>>
> >>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
> >>>
> >>>> I think the problem I ran into in 0.9 is covered in
> >>>> https://issues.apache.org/jira/browse/SPARK-1323
> >>>>
> >>>> When I kill the python process, the stacktrace I gets indicates that
> >>>> this happens at initialization.  It looks like the initial write to
> >>>> the Python process does not go through, and then the iterator hangs
> >>>> waiting for output.  I haven't had luck turning on debugging for the
> >>>> executor process.  Still trying to learn the lgo4j properties that
> >>>> need to be set.
> >>>>
> >>>> No luck yet on tracking down the memory leak.
> >>>>
> >>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
> >>>> org.apache.spark.SparkException: Python worker exited unexpectedly
> (crashed)
> >>>>       at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
> >>>>       at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
> >>>>       at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
> >>>>       at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
> >>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
> >>>>       at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> >>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
> >>>>       at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
> >>>>       at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
> >>>>       at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> >>>>       at java.security.AccessController.doPrivileged(Native Method)
> >>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
> >>>>       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
> >>>>       at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
> >>>>       at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> >>>>       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >>>>      at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>>>       at java.lang.Thread.run(Thread.java:724)
> >>>>
> >>>>
> >>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com>
> wrote:
> >>>>> I've only tried 0.9, in which I ran into the `stdin writer to Python
> >>>>> finished early` so frequently I wasn't able to load even a 1GB file.
> >>>>> Let me know if I can provide any other info!
> >>>>>
> >>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <
> matei.zaharia@gmail.com> wrote:
> >>>>>> I see, did this also fail with previous versions of Spark (0.9 or
> 0.8)? We'll try to look into these, seems like a serious error.
> >>>>>>
> >>>>>> Matei
> >>>>>>
> >>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:
> >>>>>>
> >>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
> >>>>>>> 1.0.4" from GitHub on 2014-03-18.
> >>>>>>>
> >>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but
> none
> >>>>>>> have succeeded.
> >>>>>>>
> >>>>>>> I can get this to work -- with manual interventions -- if I omit
> >>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set
> batchSize=1.  5
> >>>>>>> of the 175 executors hung, and I had to kill the python process to
> get
> >>>>>>> things going again.  The only indication of this in the logs was
> `INFO
> >>>>>>> python.PythonRDD: stdin writer to Python finished early`.
> >>>>>>>
> >>>>>>> With batchSize=1 and persist, a new memory error came up in several
> >>>>>>> tasks, before the app was failed:
> >>>>>>>
> >>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
> >>>>>>> thread Thread[stdin writer for python,5,main]
> >>>>>>> java.lang.OutOfMemoryError: Java heap space
> >>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
> >>>>>>>      at java.lang.String.<init>(String.java:203)
> >>>>>>>      at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
> >>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
> >>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
> >>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
> >>>>>>>      at
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
> >>>>>>>      at
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
> >>>>>>>      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >>>>>>>      at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
> >>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>>>>>>      at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>>>>>>      at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
> >>>>>>>      at
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> >>>>>>>
> >>>>>>> There are other exceptions, but I think they all stem from the
> above,
> >>>>>>> eg. org.apache.spark.SparkException: Error sending message to
> >>>>>>> BlockManagerMaster
> >>>>>>>
> >>>>>>> Let me know if there are other settings I should try, or if I
> should
> >>>>>>> try a newer snapshot.
> >>>>>>>
> >>>>>>> Thanks again!
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <
> matei.zaharia@gmail.com> wrote:
> >>>>>>>> Hey Jim,
> >>>>>>>>
> >>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that
> makes it group multiple objects together before passing them between Java
> and Python, but this may be too high by default. Try passing batchSize=10
> to your SparkContext constructor to lower it (the default is 1024). Or even
> batchSize=1 to match earlier versions.
> >>>>>>>>
> >>>>>>>> Matei
> >>>>>>>>
> >>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all, I'm wondering if there's any settings I can use to
> reduce the
> >>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I am
> >>>>>>>>> getting OutOfMemoryError exceptions while calculating count() on
> big,
> >>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to
> keep
> >>>>>>>>> too many of these records in memory, when all that is needed is
> to
> >>>>>>>>> stream through them and count.  Any tips for getting through this
> >>>>>>>>> workload?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Code:
> >>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed
> data
> >>>>>>>>>
> >>>>>>>>> # the biggest individual text line is ~3MB
> >>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
> >>>>>>>>> (loads(y), loads(s)))
> >>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
> >>>>>>>>>
> >>>>>>>>> parsed.count()
> >>>>>>>>> # will never finish: executor.Executor: Uncaught exception will
> FAIL
> >>>>>>>>> all executors
> >>>>>>>>>
> >>>>>>>>> Incidentally the whole app appears to be killed, but this error
> is not
> >>>>>>>>> propagated to the shell.
> >>>>>>>>>
> >>>>>>>>> Cluster:
> >>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap,
> spark.executor.memory=10GB)
> >>>>>>>>>
> >>>>>>>>> Exception:
> >>>>>>>>> java.lang.OutOfMemoryError: Java heap space
> >>>>>>>>>     at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
> >>>>>>>>>     at
> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
> >>>>>>>>>     at
> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
> >>>>>>>>>     at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>>>>>>>>     at
> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
> >>>>>>>>>     at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> >>>>>>>>>     at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> >>>>>>>>>     at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
> >>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
> >>>>>>>>>     at
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> >>>>>>>>
> >>>>>>
> >>>
> >
>

Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
This dataset is uncompressed text at ~54GB. stats() returns (count:
56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
343)

On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Okay, thanks. Do you have any info on how large your records and data file are? I'd like to reproduce and fix this.
>
> Matei
>
> On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:
>
>> Hi Matei, thanks for working with me to find these issues.
>>
>> To summarize, the issues I've seen are:
>> 0.9.0:
>> - https://issues.apache.org/jira/browse/SPARK-1323
>>
>> SNAPSHOT 2014-03-18:
>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>> Java heap space.  To me this indicates a memory leak since Spark
>> should simply be counting records of size < 3MB
>> - Without persist(), "stdin writer to Python finished early" hangs the
>> application, unknown root cause
>>
>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
>> debugging turned on.  This gives me the stacktrace on the new "stdin"
>> problem:
>>
>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
>> java.net.SocketException: Connection reset
>>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>        at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>        at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>        at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>        at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>        at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>        at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>        at org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>        at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>        at java.io.DataInputStream.read(DataInputStream.java:100)
>>        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>
>>
>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>> Cool, thanks for the update. Have you tried running a branch with this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak issue are you referring to, is it separate from this? (Couldn't find it earlier in the thread.)
>>>
>>> To turn on debug logging, copy conf/log4j.properties.template to conf/log4j.properties and change the line log4j.rootCategory=INFO, console to log4j.rootCategory=DEBUG, console. Then make sure this file is present in "conf" on all workers.
>>>
>>> BTW I've managed to run PySpark with this fix on some reasonably large S3 data (multiple GB) and it was fine. It might happen only if records are large, or something like that. How much heap are you giving to your executors, and does it show that much in the web UI?
>>>
>>> Matei
>>>
>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>
>>>> I think the problem I ran into in 0.9 is covered in
>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>
>>>> When I kill the python process, the stacktrace I gets indicates that
>>>> this happens at initialization.  It looks like the initial write to
>>>> the Python process does not go through, and then the iterator hangs
>>>> waiting for output.  I haven't had luck turning on debugging for the
>>>> executor process.  Still trying to learn the lgo4j properties that
>>>> need to be set.
>>>>
>>>> No luck yet on tracking down the memory leak.
>>>>
>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>>> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>       at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>       at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>       at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>       at java.security.AccessController.doPrivileged(Native Method)
>>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>       at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>       at java.lang.Thread.run(Thread.java:724)
>>>>
>>>>
>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>> I've only tried 0.9, in which I ran into the `stdin writer to Python
>>>>> finished early` so frequently I wasn't able to load even a 1GB file.
>>>>> Let me know if I can provide any other info!
>>>>>
>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>>> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error.
>>>>>>
>>>>>> Matei
>>>>>>
>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>
>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>>>>>>> have succeeded.
>>>>>>>
>>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>>>>>>> of the 175 executors hung, and I had to kill the python process to get
>>>>>>> things going again.  The only indication of this in the logs was `INFO
>>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>>>
>>>>>>> With batchSize=1 and persist, a new memory error came up in several
>>>>>>> tasks, before the app was failed:
>>>>>>>
>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>      at java.lang.String.<init>(String.java:203)
>>>>>>>      at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>      at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>      at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>      at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>
>>>>>>> There are other exceptions, but I think they all stem from the above,
>>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>>> BlockManagerMaster
>>>>>>>
>>>>>>> Let me know if there are other settings I should try, or if I should
>>>>>>> try a newer snapshot.
>>>>>>>
>>>>>>> Thanks again!
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>>>>> Hey Jim,
>>>>>>>>
>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>>>>>>>>
>>>>>>>> Matei
>>>>>>>>
>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>>>>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>>>>>>> too many of these records in memory, when all that is needed is to
>>>>>>>>> stream through them and count.  Any tips for getting through this
>>>>>>>>> workload?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Code:
>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>>>>>>
>>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>
>>>>>>>>> parsed.count()
>>>>>>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>>>>>>> all executors
>>>>>>>>>
>>>>>>>>> Incidentally the whole app appears to be killed, but this error is not
>>>>>>>>> propagated to the shell.
>>>>>>>>>
>>>>>>>>> Cluster:
>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>>>>>>
>>>>>>>>> Exception:
>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>
>>>>>>
>>>
>

Re: pySpark memory usage

Posted by Matei Zaharia <ma...@gmail.com>.
Okay, thanks. Do you have any info on how large your records and data file are? I’d like to reproduce and fix this.

Matei

On Apr 9, 2014, at 3:52 PM, Jim Blomo <ji...@gmail.com> wrote:

> Hi Matei, thanks for working with me to find these issues.
> 
> To summarize, the issues I've seen are:
> 0.9.0:
> - https://issues.apache.org/jira/browse/SPARK-1323
> 
> SNAPSHOT 2014-03-18:
> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
> Java heap space.  To me this indicates a memory leak since Spark
> should simply be counting records of size < 3MB
> - Without persist(), "stdin writer to Python finished early" hangs the
> application, unknown root cause
> 
> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
> debugging turned on.  This gives me the stacktrace on the new "stdin"
> problem:
> 
> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
> java.net.SocketException: Connection reset
>        at java.net.SocketInputStream.read(SocketInputStream.java:196)
>        at java.net.SocketInputStream.read(SocketInputStream.java:122)
>        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>        at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>        at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>        at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>        at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>        at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>        at java.io.FilterInputStream.read(FilterInputStream.java:133)
>        at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>        at org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>        at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>        at java.io.DataInputStream.read(DataInputStream.java:100)
>        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> 
> 
> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> Cool, thanks for the update. Have you tried running a branch with this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak issue are you referring to, is it separate from this? (Couldn't find it earlier in the thread.)
>> 
>> To turn on debug logging, copy conf/log4j.properties.template to conf/log4j.properties and change the line log4j.rootCategory=INFO, console to log4j.rootCategory=DEBUG, console. Then make sure this file is present in "conf" on all workers.
>> 
>> BTW I've managed to run PySpark with this fix on some reasonably large S3 data (multiple GB) and it was fine. It might happen only if records are large, or something like that. How much heap are you giving to your executors, and does it show that much in the web UI?
>> 
>> Matei
>> 
>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>> 
>>> I think the problem I ran into in 0.9 is covered in
>>> https://issues.apache.org/jira/browse/SPARK-1323
>>> 
>>> When I kill the python process, the stacktrace I gets indicates that
>>> this happens at initialization.  It looks like the initial write to
>>> the Python process does not go through, and then the iterator hangs
>>> waiting for output.  I haven't had luck turning on debugging for the
>>> executor process.  Still trying to learn the lgo4j properties that
>>> need to be set.
>>> 
>>> No luck yet on tracking down the memory leak.
>>> 
>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>>> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>       at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>       at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>       at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>       at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>       at java.security.AccessController.doPrivileged(Native Method)
>>>       at javax.security.auth.Subject.doAs(Subject.java:415)
>>>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>       at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>       at java.lang.Thread.run(Thread.java:724)
>>> 
>>> 
>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>> I've only tried 0.9, in which I ran into the `stdin writer to Python
>>>> finished early` so frequently I wasn't able to load even a 1GB file.
>>>> Let me know if I can provide any other info!
>>>> 
>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error.
>>>>> 
>>>>> Matei
>>>>> 
>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>> 
>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>> 
>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>>>>>> have succeeded.
>>>>>> 
>>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>>>>>> of the 175 executors hung, and I had to kill the python process to get
>>>>>> things going again.  The only indication of this in the logs was `INFO
>>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>> 
>>>>>> With batchSize=1 and persist, a new memory error came up in several
>>>>>> tasks, before the app was failed:
>>>>>> 
>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>      at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>      at java.lang.String.<init>(String.java:203)
>>>>>>      at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>      at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>      at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>      at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>      at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>      at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>      at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>      at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>> 
>>>>>> There are other exceptions, but I think they all stem from the above,
>>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>>> BlockManagerMaster
>>>>>> 
>>>>>> Let me know if there are other settings I should try, or if I should
>>>>>> try a newer snapshot.
>>>>>> 
>>>>>> Thanks again!
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>>>> Hey Jim,
>>>>>>> 
>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>>>>>>> 
>>>>>>> Matei
>>>>>>> 
>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>>>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>>>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>>>>>> too many of these records in memory, when all that is needed is to
>>>>>>>> stream through them and count.  Any tips for getting through this
>>>>>>>> workload?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Code:
>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>>>>> 
>>>>>>>> # the biggest individual text line is ~3MB
>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>>>>>> (loads(y), loads(s)))
>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>> 
>>>>>>>> parsed.count()
>>>>>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>>>>>> all executors
>>>>>>>> 
>>>>>>>> Incidentally the whole app appears to be killed, but this error is not
>>>>>>>> propagated to the shell.
>>>>>>>> 
>>>>>>>> Cluster:
>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>>>>> 
>>>>>>>> Exception:
>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>     at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>> 
>>>>> 
>> 


Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
Hi Matei, thanks for working with me to find these issues.

To summarize, the issues I've seen are:
0.9.0:
- https://issues.apache.org/jira/browse/SPARK-1323

SNAPSHOT 2014-03-18:
- When persist() used and batchSize=1, java.lang.OutOfMemoryError:
Java heap space.  To me this indicates a memory leak since Spark
should simply be counting records of size < 3MB
- Without persist(), "stdin writer to Python finished early" hangs the
application, unknown root cause

I've recently rebuilt another SNAPSHOT, git commit 16b8308 with
debugging turned on.  This gives me the stacktrace on the new "stdin"
problem:

14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished early
java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:196)
        at java.net.SocketInputStream.read(SocketInputStream.java:122)
        at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
        at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
        at sun.security.ssl.InputRecord.read(InputRecord.java:509)
        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
        at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
        at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        at org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
        at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
        at java.io.FilterInputStream.read(FilterInputStream.java:133)
        at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
        at org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
        at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)


On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia <ma...@gmail.com> wrote:
> Cool, thanks for the update. Have you tried running a branch with this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak issue are you referring to, is it separate from this? (Couldn't find it earlier in the thread.)
>
> To turn on debug logging, copy conf/log4j.properties.template to conf/log4j.properties and change the line log4j.rootCategory=INFO, console to log4j.rootCategory=DEBUG, console. Then make sure this file is present in "conf" on all workers.
>
> BTW I've managed to run PySpark with this fix on some reasonably large S3 data (multiple GB) and it was fine. It might happen only if records are large, or something like that. How much heap are you giving to your executors, and does it show that much in the web UI?
>
> Matei
>
> On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:
>
>> I think the problem I ran into in 0.9 is covered in
>> https://issues.apache.org/jira/browse/SPARK-1323
>>
>> When I kill the python process, the stacktrace I gets indicates that
>> this happens at initialization.  It looks like the initial write to
>> the Python process does not go through, and then the iterator hangs
>> waiting for output.  I haven't had luck turning on debugging for the
>> executor process.  Still trying to learn the lgo4j properties that
>> need to be set.
>>
>> No luck yet on tracking down the memory leak.
>>
>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
>> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
>>        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>        at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>        at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>        at java.security.AccessController.doPrivileged(Native Method)
>>        at javax.security.auth.Subject.doAs(Subject.java:415)
>>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>        at java.lang.Thread.run(Thread.java:724)
>>
>>
>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com> wrote:
>>> I've only tried 0.9, in which I ran into the `stdin writer to Python
>>> finished early` so frequently I wasn't able to load even a 1GB file.
>>> Let me know if I can provide any other info!
>>>
>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>>> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error.
>>>>
>>>> Matei
>>>>
>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>
>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>
>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>>>>> have succeeded.
>>>>>
>>>>> I can get this to work -- with manual interventions -- if I omit
>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>>>>> of the 175 executors hung, and I had to kill the python process to get
>>>>> things going again.  The only indication of this in the logs was `INFO
>>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>>>
>>>>> With batchSize=1 and persist, a new memory error came up in several
>>>>> tasks, before the app was failed:
>>>>>
>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>>> thread Thread[stdin writer for python,5,main]
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>       at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>       at java.lang.String.<init>(String.java:203)
>>>>>       at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>       at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>       at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>       at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>       at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>       at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>       at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>       at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>       at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>       at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>
>>>>> There are other exceptions, but I think they all stem from the above,
>>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>>> BlockManagerMaster
>>>>>
>>>>> Let me know if there are other settings I should try, or if I should
>>>>> try a newer snapshot.
>>>>>
>>>>> Thanks again!
>>>>>
>>>>>
>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>>> Hey Jim,
>>>>>>
>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>>>>>>
>>>>>> Matei
>>>>>>
>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>>>>> too many of these records in memory, when all that is needed is to
>>>>>>> stream through them and count.  Any tips for getting through this
>>>>>>> workload?
>>>>>>>
>>>>>>>
>>>>>>> Code:
>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>>>>
>>>>>>> # the biggest individual text line is ~3MB
>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>>>>> (loads(y), loads(s)))
>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>
>>>>>>> parsed.count()
>>>>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>>>>> all executors
>>>>>>>
>>>>>>> Incidentally the whole app appears to be killed, but this error is not
>>>>>>> propagated to the shell.
>>>>>>>
>>>>>>> Cluster:
>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>>>>
>>>>>>> Exception:
>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>      at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>
>>>>
>

Re: pySpark memory usage

Posted by Matei Zaharia <ma...@gmail.com>.
Cool, thanks for the update. Have you tried running a branch with this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what memory leak issue are you referring to, is it separate from this? (Couldn’t find it earlier in the thread.)

To turn on debug logging, copy conf/log4j.properties.template to conf/log4j.properties and change the line log4j.rootCategory=INFO, console to log4j.rootCategory=DEBUG, console. Then make sure this file is present in “conf” on all workers.

BTW I’ve managed to run PySpark with this fix on some reasonably large S3 data (multiple GB) and it was fine. It might happen only if records are large, or something like that. How much heap are you giving to your executors, and does it show that much in the web UI?

Matei

On Mar 29, 2014, at 10:44 PM, Jim Blomo <ji...@gmail.com> wrote:

> I think the problem I ran into in 0.9 is covered in
> https://issues.apache.org/jira/browse/SPARK-1323
> 
> When I kill the python process, the stacktrace I gets indicates that
> this happens at initialization.  It looks like the initial write to
> the Python process does not go through, and then the iterator hangs
> waiting for output.  I haven't had luck turning on debugging for the
> executor process.  Still trying to learn the lgo4j properties that
> need to be set.
> 
> No luck yet on tracking down the memory leak.
> 
> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
>        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>        at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>        at org.apache.spark.scheduler.Task.run(Task.scala:52)
>        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:415)
>        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>        at java.lang.Thread.run(Thread.java:724)
> 
> 
> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com> wrote:
>> I've only tried 0.9, in which I ran into the `stdin writer to Python
>> finished early` so frequently I wasn't able to load even a 1GB file.
>> Let me know if I can provide any other info!
>> 
>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <ma...@gmail.com> wrote:
>>> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error.
>>> 
>>> Matei
>>> 
>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:
>>> 
>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>>>> 1.0.4" from GitHub on 2014-03-18.
>>>> 
>>>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>>>> have succeeded.
>>>> 
>>>> I can get this to work -- with manual interventions -- if I omit
>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>>>> of the 175 executors hung, and I had to kill the python process to get
>>>> things going again.  The only indication of this in the logs was `INFO
>>>> python.PythonRDD: stdin writer to Python finished early`.
>>>> 
>>>> With batchSize=1 and persist, a new memory error came up in several
>>>> tasks, before the app was failed:
>>>> 
>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>>> thread Thread[stdin writer for python,5,main]
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>       at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>       at java.lang.String.<init>(String.java:203)
>>>>       at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>       at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>       at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>       at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>       at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>       at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>       at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>       at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>       at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>> 
>>>> There are other exceptions, but I think they all stem from the above,
>>>> eg. org.apache.spark.SparkException: Error sending message to
>>>> BlockManagerMaster
>>>> 
>>>> Let me know if there are other settings I should try, or if I should
>>>> try a newer snapshot.
>>>> 
>>>> Thanks again!
>>>> 
>>>> 
>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>>>>> Hey Jim,
>>>>> 
>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>>>>> 
>>>>> Matei
>>>>> 
>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>> 
>>>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>>>> too many of these records in memory, when all that is needed is to
>>>>>> stream through them and count.  Any tips for getting through this
>>>>>> workload?
>>>>>> 
>>>>>> 
>>>>>> Code:
>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>>> 
>>>>>> # the biggest individual text line is ~3MB
>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>>>> (loads(y), loads(s)))
>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>> 
>>>>>> parsed.count()
>>>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>>>> all executors
>>>>>> 
>>>>>> Incidentally the whole app appears to be killed, but this error is not
>>>>>> propagated to the shell.
>>>>>> 
>>>>>> Cluster:
>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>>> 
>>>>>> Exception:
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>      at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>      at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>> 
>>> 


Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
I think the problem I ran into in 0.9 is covered in
https://issues.apache.org/jira/browse/SPARK-1323

When I kill the python process, the stacktrace I gets indicates that
this happens at initialization.  It looks like the initial write to
the Python process does not go through, and then the iterator hangs
waiting for output.  I haven't had luck turning on debugging for the
executor process.  Still trying to learn the lgo4j properties that
need to be set.

No luck yet on tracking down the memory leak.

14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
        at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
        at org.apache.spark.scheduler.Task.run(Task.scala:52)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)


On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <ji...@gmail.com> wrote:
> I've only tried 0.9, in which I ran into the `stdin writer to Python
> finished early` so frequently I wasn't able to load even a 1GB file.
> Let me know if I can provide any other info!
>
> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <ma...@gmail.com> wrote:
>> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error.
>>
>> Matei
>>
>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:
>>
>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>>> 1.0.4" from GitHub on 2014-03-18.
>>>
>>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>>> have succeeded.
>>>
>>> I can get this to work -- with manual interventions -- if I omit
>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>>> of the 175 executors hung, and I had to kill the python process to get
>>> things going again.  The only indication of this in the logs was `INFO
>>> python.PythonRDD: stdin writer to Python finished early`.
>>>
>>> With batchSize=1 and persist, a new memory error came up in several
>>> tasks, before the app was failed:
>>>
>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>>> thread Thread[stdin writer for python,5,main]
>>> java.lang.OutOfMemoryError: Java heap space
>>>        at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>        at java.lang.String.<init>(String.java:203)
>>>        at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>        at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>        at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>        at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>        at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>        at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>
>>> There are other exceptions, but I think they all stem from the above,
>>> eg. org.apache.spark.SparkException: Error sending message to
>>> BlockManagerMaster
>>>
>>> Let me know if there are other settings I should try, or if I should
>>> try a newer snapshot.
>>>
>>> Thanks again!
>>>
>>>
>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>>>> Hey Jim,
>>>>
>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>>>>
>>>> Matei
>>>>
>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>>
>>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>>> too many of these records in memory, when all that is needed is to
>>>>> stream through them and count.  Any tips for getting through this
>>>>> workload?
>>>>>
>>>>>
>>>>> Code:
>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>>
>>>>> # the biggest individual text line is ~3MB
>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>>> (loads(y), loads(s)))
>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>
>>>>> parsed.count()
>>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>>> all executors
>>>>>
>>>>> Incidentally the whole app appears to be killed, but this error is not
>>>>> propagated to the shell.
>>>>>
>>>>> Cluster:
>>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>>
>>>>> Exception:
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>       at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>
>>

Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
I've only tried 0.9, in which I ran into the `stdin writer to Python
finished early` so frequently I wasn't able to load even a 1GB file.
Let me know if I can provide any other info!

On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia <ma...@gmail.com> wrote:
> I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We'll try to look into these, seems like a serious error.
>
> Matei
>
> On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:
>
>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
>> 1.0.4" from GitHub on 2014-03-18.
>>
>> I tried batchSizes of 512, 10, and 1 and each got me further but none
>> have succeeded.
>>
>> I can get this to work -- with manual interventions -- if I omit
>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
>> of the 175 executors hung, and I had to kill the python process to get
>> things going again.  The only indication of this in the logs was `INFO
>> python.PythonRDD: stdin writer to Python finished early`.
>>
>> With batchSize=1 and persist, a new memory error came up in several
>> tasks, before the app was failed:
>>
>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
>> thread Thread[stdin writer for python,5,main]
>> java.lang.OutOfMemoryError: Java heap space
>>        at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>        at java.lang.String.<init>(String.java:203)
>>        at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>        at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>        at org.apache.hadoop.io.Text.decode(Text.java:350)
>>        at org.apache.hadoop.io.Text.decode(Text.java:327)
>>        at org.apache.hadoop.io.Text.toString(Text.java:254)
>>        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>        at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>
>> There are other exceptions, but I think they all stem from the above,
>> eg. org.apache.spark.SparkException: Error sending message to
>> BlockManagerMaster
>>
>> Let me know if there are other settings I should try, or if I should
>> try a newer snapshot.
>>
>> Thanks again!
>>
>>
>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>>> Hey Jim,
>>>
>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>>>
>>> Matei
>>>
>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>>>
>>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>>> memory needed by the PythonRDD when computing simple stats.  I am
>>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>>> too many of these records in memory, when all that is needed is to
>>>> stream through them and count.  Any tips for getting through this
>>>> workload?
>>>>
>>>>
>>>> Code:
>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>>>
>>>> # the biggest individual text line is ~3MB
>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>>> (loads(y), loads(s)))
>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>
>>>> parsed.count()
>>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>>> all executors
>>>>
>>>> Incidentally the whole app appears to be killed, but this error is not
>>>> propagated to the shell.
>>>>
>>>> Cluster:
>>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>>>
>>>> Exception:
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>       at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>
>

Re: pySpark memory usage

Posted by Matei Zaharia <ma...@gmail.com>.
I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We’ll try to look into these, seems like a serious error.

Matei

On Mar 27, 2014, at 7:27 PM, Jim Blomo <ji...@gmail.com> wrote:

> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
> 1.0.4" from GitHub on 2014-03-18.
> 
> I tried batchSizes of 512, 10, and 1 and each got me further but none
> have succeeded.
> 
> I can get this to work -- with manual interventions -- if I omit
> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
> of the 175 executors hung, and I had to kill the python process to get
> things going again.  The only indication of this in the logs was `INFO
> python.PythonRDD: stdin writer to Python finished early`.
> 
> With batchSize=1 and persist, a new memory error came up in several
> tasks, before the app was failed:
> 
> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
> thread Thread[stdin writer for python,5,main]
> java.lang.OutOfMemoryError: Java heap space
>        at java.util.Arrays.copyOfRange(Arrays.java:2694)
>        at java.lang.String.<init>(String.java:203)
>        at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>        at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>        at org.apache.hadoop.io.Text.decode(Text.java:350)
>        at org.apache.hadoop.io.Text.decode(Text.java:327)
>        at org.apache.hadoop.io.Text.toString(Text.java:254)
>        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>        at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> 
> There are other exceptions, but I think they all stem from the above,
> eg. org.apache.spark.SparkException: Error sending message to
> BlockManagerMaster
> 
> Let me know if there are other settings I should try, or if I should
> try a newer snapshot.
> 
> Thanks again!
> 
> 
> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
>> Hey Jim,
>> 
>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>> 
>> Matei
>> 
>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>> 
>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>> memory needed by the PythonRDD when computing simple stats.  I am
>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>> too many of these records in memory, when all that is needed is to
>>> stream through them and count.  Any tips for getting through this
>>> workload?
>>> 
>>> 
>>> Code:
>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>> 
>>> # the biggest individual text line is ~3MB
>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>> (loads(y), loads(s)))
>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>> 
>>> parsed.count()
>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>> all executors
>>> 
>>> Incidentally the whole app appears to be killed, but this error is not
>>> propagated to the shell.
>>> 
>>> Cluster:
>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>> 
>>> Exception:
>>> java.lang.OutOfMemoryError: Java heap space
>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>       at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>       at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>> 


Re: pySpark memory usage

Posted by Jim Blomo <ji...@gmail.com>.
Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
1.0.4" from GitHub on 2014-03-18.

I tried batchSizes of 512, 10, and 1 and each got me further but none
have succeeded.

I can get this to work -- with manual interventions -- if I omit
`parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
of the 175 executors hung, and I had to kill the python process to get
things going again.  The only indication of this in the logs was `INFO
python.PythonRDD: stdin writer to Python finished early`.

With batchSize=1 and persist, a new memory error came up in several
tasks, before the app was failed:

14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
thread Thread[stdin writer for python,5,main]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:2694)
        at java.lang.String.<init>(String.java:203)
        at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
        at java.nio.CharBuffer.toString(CharBuffer.java:1201)
        at org.apache.hadoop.io.Text.decode(Text.java:350)
        at org.apache.hadoop.io.Text.decode(Text.java:327)
        at org.apache.hadoop.io.Text.toString(Text.java:254)
        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
        at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

There are other exceptions, but I think they all stem from the above,
eg. org.apache.spark.SparkException: Error sending message to
BlockManagerMaster

Let me know if there are other settings I should try, or if I should
try a newer snapshot.

Thanks again!


On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia <ma...@gmail.com> wrote:
> Hey Jim,
>
> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.
>
> Matei
>
> On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:
>
>> Hi all, I'm wondering if there's any settings I can use to reduce the
>> memory needed by the PythonRDD when computing simple stats.  I am
>> getting OutOfMemoryError exceptions while calculating count() on big,
>> but not absurd, records.  It seems like PythonRDD is trying to keep
>> too many of these records in memory, when all that is needed is to
>> stream through them and count.  Any tips for getting through this
>> workload?
>>
>>
>> Code:
>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>
>> # the biggest individual text line is ~3MB
>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>> (loads(y), loads(s)))
>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>
>> parsed.count()
>> # will never finish: executor.Executor: Uncaught exception will FAIL
>> all executors
>>
>> Incidentally the whole app appears to be killed, but this error is not
>> propagated to the shell.
>>
>> Cluster:
>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>
>> Exception:
>> java.lang.OutOfMemoryError: Java heap space
>>        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>        at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>        at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>        at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>

Re: pySpark memory usage

Posted by Matei Zaharia <ma...@gmail.com>.
Hey Jim,

In Spark 0.9 we added a “batchSize” parameter to PySpark that makes it group multiple objects together before passing them between Java and Python, but this may be too high by default. Try passing batchSize=10 to your SparkContext constructor to lower it (the default is 1024). Or even batchSize=1 to match earlier versions.

Matei

On Mar 21, 2014, at 6:18 PM, Jim Blomo <ji...@gmail.com> wrote:

> Hi all, I'm wondering if there's any settings I can use to reduce the
> memory needed by the PythonRDD when computing simple stats.  I am
> getting OutOfMemoryError exceptions while calculating count() on big,
> but not absurd, records.  It seems like PythonRDD is trying to keep
> too many of these records in memory, when all that is needed is to
> stream through them and count.  Any tips for getting through this
> workload?
> 
> 
> Code:
> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
> 
> # the biggest individual text line is ~3MB
> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
> (loads(y), loads(s)))
> parsed.persist(StorageLevel.MEMORY_AND_DISK)
> 
> parsed.count()
> # will never finish: executor.Executor: Uncaught exception will FAIL
> all executors
> 
> Incidentally the whole app appears to be killed, but this error is not
> propagated to the shell.
> 
> Cluster:
> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
> 
> Exception:
> java.lang.OutOfMemoryError: Java heap space
>        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>        at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>        at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>        at org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>        at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)