You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Ruslan Al-fakikh <ru...@jalent.ru> on 2012/01/06 04:14:33 UTC

RE: java.lang.OutOfMemoryError when using TOP udf

According to my calculations the biggest TOP number is 2380324
Could that be the reason of failure in maps?

-----Original Message-----
From: Jonathan Coveney [mailto:jcoveney@gmail.com] 
Sent: 28 декабря 2011 г. 23:19
To: user@pig.apache.org
Subject: Re: java.lang.OutOfMemoryError when using TOP udf

How large is TopNumber? I imagine that if your TopNumber is large enough, the UDF could still fail if the TopNumber # of values can't fit in the priority queue it puts together. Although in that final merge it could be smarter about it... will have to check the code when I get a chance to see if they are.

2011/12/27 Ruslan Al-fakikh <ru...@jalent.ru>

> Actually I fixed it. I had to use an additional grouping to make it 
> really Algebraic. But now I see OutOfMemory during Map merge:
>
> [2011-12-27 08:44:07] FATAL (Child.java:318) - Error running child :
> java.lang.OutOfMemoryError: Java heap space
>        at
> org.apache.hadoop.mapred.IFile$Reader.readNextBlock(IFile.java:355)
>        at org.apache.hadoop.mapred.IFile$Reader.next(IFile.java:417)
>        at org.apache.hadoop.mapred.Merger$Segment.next(Merger.java:220)
>        at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:420)
>        at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381)
>        at org.apache.hadoop.mapred.Merger.merge(Merger.java:77)
>        at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1548)
>        at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1180)
>        at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:582)
>        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:649)
>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
>        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:396)
>        at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
>        at org.apache.hadoop.mapred.Child.main(Child.java:264)
>
> Can anyone help?
>
> Thanks in advance!
>
> -----Original Message-----
> From: Ruslan Al-fakikh [mailto:ruslan.al-fakikh@jalent.ru]
> Sent: 22 декабря 2011 г. 5:38
> To: user@pig.apache.org
> Subject: RE: java.lang.OutOfMemoryError when using TOP udf
>
> Hey guys
>
> I did it according to the advice and moved the TOP execution the map 
> phase and now I am getting the same error, but now it comes from that map phase.
>
> Any help much appreciated!
>
> Here is my current code:
> https://gist.github.com/1508511
>
> Error stack trace:
> [2011-12-21 08:17:46] FATAL (Child.java:318) - Error running child :
> java.lang.OutOfMemoryError: Java heap space
>        at java.io.DataInputStream.readUTF(DataInputStream.java:644)
>        at java.io.DataInputStream.readUTF(DataInputStream.java:547)
>        at
> org.apache.pig.data.BinInterSedes.readCharArray(BinInterSedes.java:210)
>        at
> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:333)
>        at
> org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:251)
>        at
> org.apache.pig.data.BinInterSedes.addColsToTuple(BinInterSedes.java:555)
>        at
> org.apache.pig.data.BinSedesTuple.readFields(BinSedesTuple.java:64)
>        at
> org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.readFromFile(DefaultDataBag.java:244)
>        at
> org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.next(DefaultDataBag.java:231)
>        at
> org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.hasNext(DefaultDataBag.java:157)
>        at org.apache.pig.builtin.TOP.updateTop(TOP.java:139)
>        at org.apache.pig.builtin.TOP.exec(TOP.java:116)
>        at org.apache.pig.builtin.TOP.exec(TOP.java:65)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:245)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:287)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:338)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:240)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>        at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
>        at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:237)
>        at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:232)
>        at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
>        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
>        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
>        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:396)
>        at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
>        at org.apache.hadoop.mapred.Child.main(Child.java:264)
>
> -----Original Message-----
> From: Dmitriy Ryaboy [mailto:dvryaboy@gmail.com]
> Sent: 17 декабря 2011 г. 0:16
> To: user@pig.apache.org
> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
>
> I meant the latter, an actual join statement. So, generate the counts, 
> join them to the original relation, then group again and do TOP.
>
> D
>
> On Fri, Dec 16, 2011 at 5:32 AM, Ruslan Al-fakikh < 
> ruslan.al-fakikh@jalent.ru> wrote:
> > Dmitriy,
> >
> > You wrote
> >
> >> > Ok so this:
> >> >
> >> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
> >> >                                count = COUNT(thirdLevelsSummed);
> >> >                                result = TOP( (int)(count * 
> >> > (double) ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, 
> >> > thirdLevelsSummed);
> >> >                                GENERATE FLATTEN(result); }
> >> >
> >> > requires "count" to be calculated before TOP can be applied. 
> >> > Since count can't be calculated until the reduce side, naturally, 
> >> > TOP can't start working on the map side (as it doesn't know its 
> >> > arguments
> yet). Try generating the counts * ($TLP + $BP) separately, joining 
> them in (I am guessing you have no more than a few K categories -- in 
> that case, you can do a replicated join), and then do group and TOP on.
> >
> > Probably I didn't understand your logic correctly. What I did is:
> > changed this:
> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
> >                                count = COUNT(thirdLevelsSummed);
> >                                result = TOP( (int)(count * (double) 
> > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, 
> > thirdLevelsSummed);
> >                                GENERATE FLATTEN(result); } to this:
> > thirdLevelsTopNumberCounted = FOREACH thirdLevelsByCategory GENERATE
> >
> > group,
> >
> > thirdLevelsSummed,
> >
> > (int)( COUNT(thirdLevelsSummed) * (double) ($THIRD_LEVELS_PERCENTAGE 
> > +
> > $BOTS_PERCENTAGE) ) AS TopNumber;
> >
> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsTopNumberCounted 
> > GENERATE
> >
> > FLATTEN(TOP(TopNumber, 3, thirdLevelsSummed));
> >
> > So I removed the COUNT from the nested group. It didn't help. 
> > Probably
> you meant the JOIN ... USING 'replicated' statement, but I didn't get 
> how I can apply it here.
> >
> > Thanks
> >
> > -----Original Message-----
> > From: Ruslan Al-fakikh [mailto:ruslan.al-fakikh@jalent.ru]
> > Sent: 24 ноября 2011 г. 15:56
> > To: user@pig.apache.org
> > Subject: RE: java.lang.OutOfMemoryError when using TOP udf
> >
> > Hm. Interesting. Yeah, I really haven't seen the error after setting
> mapred.child.java.opts=-Xmx1024m.
> > Probably I won't have to fix the Pig script:)
> >
> > -----Original Message-----
> > From: Jonathan Coveney [mailto:jcoveney@gmail.com]
> > Sent: 23 ноября 2011 г. 11:46
> > To: user@pig.apache.org
> > Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> >
> > I have seen issues with spilling if it had less than 1GB of heap. 
> > Once I
> allocated enough ram, no issues. It seems unlikely to me that the bag 
> implementation fails on this because it's such a common use and nobody 
> has reported an error, and running with less than 1GB of heap is 
> definitely not recommended. Very curious if the error crops up again.
> >
> > 2011/11/22 pablomar <pa...@gmail.com>
> >
> >> just a guess .. could it be possible that the Bag is kept in memory 
> >> instead of being spilled to disk ?
> >> browsing the code of InternalCachedBag, I saw:
> >>
> >> private void init(int bagCount, float percent) {
> >>        factory = TupleFactory.getInstance();
> >>        mContents = new ArrayList<Tuple> < 
> >> http://javasourcecode.org/html/open-source/jdk/jdk-6u23/java/util/A
> >> rr
> >> a
> >> yList.java.html
> >> >();
> >>
> >>        long max = Runtime.getRuntime().maxMemory();
> >>        maxMemUsage = (long)(((float)max * percent) / 
> >> (float)bagCount);
> >>        cacheLimit = Integer.MAX_VALUE;
> >>
> >>        // set limit to 0, if memusage is 0 or really really small.
> >>    // then all tuples are put into disk        if (maxMemUsage < 1) {
> >>            cacheLimit = 0;
> >>        }
> >>
> >>        addDone = false;
> >>    }
> >>
> >> my guess is the cacheLimit was set to Integer.MAX_VALUE and it's 
> >> trying to keep all in memory when it is not big enough but not so 
> >> small to have cacheLimit reset to 0
> >>
> >>
> >>
> >>
> >> On Tue, Nov 22, 2011 at 10:08 AM, Ruslan Al-fakikh < 
> >> ruslan.al-fakikh@jalent.ru> wrote:
> >>
> >> > Jonathan,
> >> >
> >> > I am running it on Prod cluster in MR mode, not locally. I 
> >> > started to see the issue when input size grew. A few days ago I 
> >> > found a workaround of putting this property:
> >> > mapred.child.java.opts=-Xmx1024m
> >> > But I think this is a temporary solution and the job will fail 
> >> > when the input size will grow again.
> >> >
> >> > Dmitriy,
> >> >
> >> > Thanks a lot for the investigation. I'll try it.
> >> >
> >> > -----Original Message-----
> >> > From: Dmitriy Ryaboy [mailto:dvryaboy@gmail.com]
> >> > Sent: 22 ноября 2011 г. 2:21
> >> > To: user@pig.apache.org
> >> > Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> >> >
> >> > Ok so this:
> >> >
> >> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
> >> >                                count = COUNT(thirdLevelsSummed);
> >> >                                result = TOP( (int)(count * 
> >> > (double) ($THIRD_LEVELS_PERCENTAGE +
> >> > $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed);
> >> >                                GENERATE FLATTEN(result); }
> >> >
> >> > requires "count" to be calculated before TOP can be applied. 
> >> > Since count can't be calculated until the reduce side, naturally, 
> >> > TOP can't start working on the map side (as it doesn't know its 
> >> > arguments
> yet).
> >> >
> >> > Try generating the counts * ($TLP + $BP) separately, joining them 
> >> > in (I
> >> am
> >> > guessing you have no more than a few K categories -- in that 
> >> > case, you
> >> can
> >> > do a replicated join), and then do group and TOP on.
> >> >
> >> > On Mon, Nov 21, 2011 at 1:53 PM, Jonathan Coveney 
> >> > <jc...@gmail.com>
> >> > wrote:
> >> > > You're right pablomar...hmm
> >> > >
> >> > > Ruslan: are you running this in mr mode on a cluster, or locally?
> >> > >
> >> > > I'm noticing this:
> >> > > [2011-11-16 12:34:55] INFO  (SpillableMemoryManager.java:154) - 
> >> > > first memory handler call- Usage threshold init =
> >> > > 175308800(171200K) used =
> >> > > 373454552(364701K) committed = 524288000(512000K) max =
> >> > > 524288000(512000K)
> >> > >
> >> > > It looks like your max memory is 512MB. I've had issues with 
> >> > > bag spilling with less than 1GB allocated (-Xmx1024mb).
> >> > >
> >> > > 2011/11/21 pablomar <pa...@gmail.com>
> >> > >
> >> > >> i might be wrong, but it seems the error comes from
> >> > >> while(itr.hasNext())
> >> > >> not from the add to the queue
> >> > >> so i don't think it is related to the number of elements in 
> >> > >> the queue ... maybe the field lenght?
> >> > >>
> >> > >> On 11/21/11, Jonathan Coveney <jc...@gmail.com> wrote:
> >> > >> > Internally, TOP is using a priority queue. It tries to be 
> >> > >> > smart about pulling off excess elements, but if you ask it 
> >> > >> > for enough elements, it
> >> > >> can
> >> > >> > blow up, because the priority queue is going to have n 
> >> > >> > elements, where n
> >> > >> is
> >> > >> > the ranking you want. This is consistent with the stack 
> >> > >> > trace, which died on updateTop which is when elements are 
> >> > >> > added to the
> >> > priority queue.
> >> > >> >
> >> > >> > Ruslan, how large are the limits you're setting? ie 
> >> > >> > (int)(count
> >> > >> > *
> >> > >> (double)
> >> > >> > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) )
> >> > >> >
> >> > >> > As far as TOP's implementation, I imagine you could get 
> >> > >> > around the issue
> >> > >> by
> >> > >> > using a sorted data bag, but that might be much slower. hmm.
> >> > >> >
> >> > >> > 2011/11/21 Ruslan Al-fakikh <ru...@jalent.ru>
> >> > >> >
> >> > >> >> Ok. Here it is:
> >> > >> >> https://gist.github.com/1383266
> >> > >> >>
> >> > >> >> -----Original Message-----
> >> > >> >> From: Dmitriy Ryaboy [mailto:dvryaboy@gmail.com]
> >> > >> >> Sent: 21 ноября 2011 г. 20:32
> >> > >> >> To: user@pig.apache.org
> >> > >> >> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> >> > >> >>
> >> > >> >> Ruslan, I think the mailing list is set to reject 
> >> > >> >> attachments
> >> > >> >> -- can you post it as a github gist or something similar, 
> >> > >> >> and send a
> >> > link?
> >> > >> >>
> >> > >> >> D
> >> > >> >>
> >> > >> >> On Mon, Nov 21, 2011 at 6:11 AM, Ruslan Al-Fakikh 
> >> > >> >> <ru...@jalent.ru> wrote:
> >> > >> >> > Hey Dmitriy,
> >> > >> >> >
> >> > >> >> > I attached the script. It is not a plain-pig script, 
> >> > >> >> > because I make some preprocessing before submitting it to 
> >> > >> >> > cluster, but the general idea of what I submit is clear.
> >> > >> >> >
> >> > >> >> > Thanks in advance!
> >> > >> >> >
> >> > >> >> > On Fri, Nov 18, 2011 at 12:07 AM, Dmitriy Ryaboy 
> >> > >> >> > <dv...@gmail.com>
> >> > >> >> wrote:
> >> > >> >> >> Ok, so it's something in the rest of the script that's 
> >> > >> >> >> causing this to happen. Ruslan, if you send your script, 
> >> > >> >> >> I can probably figure out why (usually, it's using 
> >> > >> >> >> another, non-agebraic udf in your foreach, or for pig 
> >> > >> >> >> 0.8, generating a constant in the
> >> > foreach).
> >> > >> >> >>
> >> > >> >> >> D
> >> > >> >> >>
> >> > >> >> >> On Thu, Nov 17, 2011 at 9:59 AM, pablomar 
> >> > >> >> >> <pa...@gmail.com> wrote:
> >> > >> >> >>> according to the stack trace, the algebraic is not 
> >> > >> >> >>> being used it says
> >> > >> >> >>> updateTop(Top.java:139)
> >> > >> >> >>> exec(Top.java:116)
> >> > >> >> >>>
> >> > >> >> >>> On 11/17/11, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> >> > >> >> >>>> The top udf does not try to process all data in memory 
> >> > >> >> >>>> if the algebraic optimization can be applied. It does 
> >> > >> >> >>>> need to keep the topn numbers in memory of course. Can 
> >> > >> >> >>>> you confirm algebraic mode is
> >> > >> >> used?
> >> > >> >> >>>>
> >> > >> >> >>>> On Nov 17, 2011, at 6:13 AM, "Ruslan Al-fakikh"
> >> > >> >> >>>> <ru...@jalent.ru>
> >> > >> >> >>>> wrote:
> >> > >> >> >>>>
> >> > >> >> >>>>> Hey guys,
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>> I encounter java.lang.OutOfMemoryError when using TOP udf.
> >> > >> >> >>>>> It seems that the udf tries to process all data in memory.
> >> > >> >> >>>>>
> >> > >> >> >>>>> Is there a workaround for TOP? Or maybe there is some 
> >> > >> >> >>>>> other way of getting top results? I cannot use LIMIT 
> >> > >> >> >>>>> since I need to 5% of data, not a constant number of rows.
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>> I am using:
> >> > >> >> >>>>>
> >> > >> >> >>>>> Apache Pig version 0.8.1-cdh3u2 (rexported)
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>> The stack trace is:
> >> > >> >> >>>>>
> >> > >> >> >>>>> [2011-11-16 12:34:55] INFO  (CodecPool.java:128) - 
> >> > >> >> >>>>> Got brand-new decompressor
> >> > >> >> >>>>>
> >> > >> >> >>>>> [2011-11-16 12:34:55] INFO  (Merger.java:473) - Down 
> >> > >> >> >>>>> to the last merge-pass, with 21 segments left of total size:
> >> > >> >> >>>>> 2057257173 bytes
> >> > >> >> >>>>>
> >> > >> >> >>>>> [2011-11-16 12:34:55] INFO
> >> > >> >> >>>>> (SpillableMemoryManager.java:154) - first memory 
> >> > >> >> >>>>> handler
> >> > >> >> >>>>> call- Usage threshold init =
> >> > >> >> >>>>> 175308800(171200K) used =
> >> > >> >> >>>>> 373454552(364701K) committed = 524288000(512000K) max 
> >> > >> >> >>>>> =
> >> > >> >> >>>>> 524288000(512000K)
> >> > >> >> >>>>>
> >> > >> >> >>>>> [2011-11-16 12:36:22] INFO
> >> > >> >> >>>>> (SpillableMemoryManager.java:167) - first memory 
> >> > >> >> >>>>> handler call - Collection threshold init =
> >> > >> >> >>>>> 175308800(171200K) used =
> >> > >> >> >>>>> 496500704(484863K) committed = 524288000(512000K) max 
> >> > >> >> >>>>> =
> >> > >> >> >>>>> 524288000(512000K)
> >> > >> >> >>>>>
> >> > >> >> >>>>> [2011-11-16 12:37:28] INFO  
> >> > >> >> >>>>> (TaskLogsTruncater.java:69)
> >> > >> >> >>>>> - Initializing logs'
> >> > >> >> >>>>> truncater with mapRetainSize=-1 and 
> >> > >> >> >>>>> reduceRetainSize=-1
> >> > >> >> >>>>>
> >> > >> >> >>>>> [2011-11-16 12:37:28] FATAL (Child.java:318) - Error 
> >> > >> >> >>>>> running
> >> > >> child :
> >> > >> >> >>>>> java.lang.OutOfMemoryError: Java heap space
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> java.util.Arrays.copyOfRange(Arrays.java:3209)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> java.lang.String.<init>(String.java:215)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:
> >> > >> >> >>>>> 64
> >> > >> >> >>>>> 4
> >> > >> >> >>>>> )
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:
> >> > >> >> >>>>> 54
> >> > >> >> >>>>> 7
> >> > >> >> >>>>> )
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readCharArray(BinIn
> >> > >> >> >>>>> te
> >> > >> >> >>>>> r
> >> > >> >> >>>>> Sede
> >> > >> >> >>>>> s.java
> >> > >> >> >>>>> :210)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterS
> >> > >> >> >>>>> ed
> >> > >> >> >>>>> e
> >> > >> >> >>>>> s.ja
> >> > >> >> >>>>> va:333
> >> > >> >> >>>>> )
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterS
> >> > >> >> >>>>> ed
> >> > >> >> >>>>> e
> >> > >> >> >>>>> s.ja
> >> > >> >> >>>>> va:251
> >> > >> >> >>>>> )
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.addColsToTuple(BinI
> >> > >> >> >>>>> nt
> >> > >> >> >>>>> e
> >> > >> >> >>>>> rSed
> >> > >> >> >>>>> es.jav
> >> > >> >> >>>>> a:555)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.data.BinSedesTuple.readFields(BinSedes
> >> > >> >> >>>>> Tu
> >> > >> >> >>>>> p
> >> > >> >> >>>>> le.j
> >> > >> >> >>>>> ava:64
> >> > >> >> >>>>> )
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.data.InternalCachedBag$CachedBagIterator.
> >> > >> >> >>>>> hasN
> >> > >> >> >>>>> ext(In
> >> > >> >> >>>>> ternalCach
> >> > >> >> >>>>> edBag.java:237)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.builtin.TOP.updateTop(TOP.java:139)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:116)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:65)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> > >> >> >>>>> expres
> >> > >> >> >>>>> sionOperat
> >> > >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:245)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> > >> >> >>>>> expres
> >> > >> >> >>>>> sionOperat
> >> > >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:287)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> > >> >> >>>>> relati
> >> > >> >> >>>>> onalOperat
> >> > >> >> >>>>> ors.POForEach.processPlan(POForEach.java:338)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> > >> >> >>>>> relati
> >> > >> >> >>>>> onalOperat
> >> > >> >> >>>>> ors.POForEach.getNext(POForEach.java:290)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> > >> >> >>>>> Physic
> >> > >> >> >>>>> alOperator
> >> > >> >> >>>>> .processInput(PhysicalOperator.java:276)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> >> > >> >> >>>>> relati
> >> > >> >> >>>>> onalOperat
> >> > >> >> >>>>> ors.POForEach.getNext(POForEach.java:240)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> >> > >> >> >>>>> ce
> >> > >> >> >>>>> L
> >> > >> >> >>>>> ayer
> >> > >> >> >>>>> .PigMa
> >> > >> >> >>>>> pReduce$Re
> >> > >> >> >>>>> duce.runPipeline(PigMapReduce.java:434)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> >> > >> >> >>>>> ce
> >> > >> >> >>>>> L
> >> > >> >> >>>>> ayer
> >> > >> >> >>>>> .PigMa
> >> > >> >> >>>>> pReduce$Re
> >> > >> >> >>>>> duce.processOnePackageOutput(PigMapReduce.java:402)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> >> > >> >> >>>>> ce
> >> > >> >> >>>>> L
> >> > >> >> >>>>> ayer
> >> > >> >> >>>>> .PigMa
> >> > >> >> >>>>> pReduce$Re
> >> > >> >> >>>>> duce.reduce(PigMapReduce.java:382)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> >> > >> >> >>>>> ce
> >> > >> >> >>>>> L
> >> > >> >> >>>>> ayer
> >> > >> >> >>>>> .PigMa
> >> > >> >> >>>>> pReduce$Re
> >> > >> >> >>>>> duce.reduce(PigMapReduce.java:251)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:
> >> > >> >> >>>>> 17
> >> > >> >> >>>>> 6
> >> > >> >> >>>>> )
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> >> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:
> >> > >> >> >>>>> 572)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:
> >> > >> >> >>>>> 414)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> java.security.AccessController.doPrivileged(Native
> >> > >> >> >>>>> Method)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> javax.security.auth.Subject.doAs(Subject.java:396)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
> >> > >> >> >>>>> Us
> >> > >> >> >>>>> e
> >> > >> >> >>>>> rGro
> >> > >> >> >>>>> upInfo
> >> > >> >> >>>>> rmation.ja
> >> > >> >> >>>>> va:1127)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> org.apache.hadoop.mapred.Child.main(Child.java:264)
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>> stderr logs
> >> > >> >> >>>>>
> >> > >> >> >>>>> Exception in thread "Low Memory Detector"
> >> > >> >> >>>>> java.lang.OutOfMemoryError: Java heap space
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.getCompos
> >> > >> >> >>>>> it
> >> > >> >> >>>>> e
> >> > >> >> >>>>> Data
> >> > >> >> >>>>> (Memor
> >> > >> >> >>>>> yNotifInfo
> >> > >> >> >>>>> CompositeData.java:42)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.toComposi
> >> > >> >> >>>>> te
> >> > >> >> >>>>> D
> >> > >> >> >>>>> ata(
> >> > >> >> >>>>> Memory
> >> > >> >> >>>>> NotifInfoC
> >> > >> >> >>>>> ompositeData.java:36)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> sun.management.MemoryImpl.createNotification(MemoryImpl.
> >> > >> >> >>>>> java
> >> > >> >> >>>>> :168)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>>
> >> > >> >>
> >> > >> >>
> >> > >>
> >> >
> >>
> sun.management.MemoryPoolImpl$CollectionSensor.triggerAction(MemoryPoolImpl.
> >> > >> >> >>>>> java:300)
> >> > >> >> >>>>>
> >> > >> >> >>>>>                at
> >> > >> >> >>>>> sun.management.Sensor.trigger(Sensor.java:120)
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>>
> >> > >> >> >>>>> Thanks in advance!
> >> > >> >> >>>>>
> >> > >> >> >>>>
> >> > >> >> >>>
> >> > >> >> >>
> >> > >> >> >
> >> > >> >> >
> >> > >> >> >
> >> > >> >> > --
> >> > >> >> > Best Regards,
> >> > >> >> > Ruslan Al-Fakikh
> >> > >> >> >
> >> > >> >>
> >> > >> >>
> >> > >> >
> >> > >>
> >> > >
> >> >
> >> >
> >>
> >
> >
>
>
>


Re: java.lang.OutOfMemoryError when using TOP udf

Posted by Jonathan Coveney <jc...@gmail.com>.
Ruslan, I took a look and it is being reasonable. I do that that that is
the issue: the way that it works is by holding a priority queue of however
many items you care about, adding one, then popping the bottom one. If it
has to hold almost 3M objects in memory, memory issues is a real likely
thing. A couple things you can do:

- have fewer columns. ie only do "TOP" of the things you really care about
- more memory (don't you love that?)

Others may have other suggestions.

2012/1/5 Ruslan Al-fakikh <ru...@jalent.ru>

> According to my calculations the biggest TOP number is 2380324
> Could that be the reason of failure in maps?
>
> -----Original Message-----
> From: Jonathan Coveney [mailto:jcoveney@gmail.com]
> Sent: 28 декабря 2011 г. 23:19
> To: user@pig.apache.org
> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
>
> How large is TopNumber? I imagine that if your TopNumber is large enough,
> the UDF could still fail if the TopNumber # of values can't fit in the
> priority queue it puts together. Although in that final merge it could be
> smarter about it... will have to check the code when I get a chance to see
> if they are.
>
> 2011/12/27 Ruslan Al-fakikh <ru...@jalent.ru>
>
> > Actually I fixed it. I had to use an additional grouping to make it
> > really Algebraic. But now I see OutOfMemory during Map merge:
> >
> > [2011-12-27 08:44:07] FATAL (Child.java:318) - Error running child :
> > java.lang.OutOfMemoryError: Java heap space
> >        at
> > org.apache.hadoop.mapred.IFile$Reader.readNextBlock(IFile.java:355)
> >        at org.apache.hadoop.mapred.IFile$Reader.next(IFile.java:417)
> >        at org.apache.hadoop.mapred.Merger$Segment.next(Merger.java:220)
> >        at
> org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:420)
> >        at
> org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381)
> >        at org.apache.hadoop.mapred.Merger.merge(Merger.java:77)
> >        at
> >
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1548)
> >        at
> > org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1180)
> >        at
> >
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:582)
> >        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:649)
> >         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
> >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> >        at java.security.AccessController.doPrivileged(Native Method)
> >        at javax.security.auth.Subject.doAs(Subject.java:396)
> >        at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
> >        at org.apache.hadoop.mapred.Child.main(Child.java:264)
> >
> > Can anyone help?
> >
> > Thanks in advance!
> >
> > -----Original Message-----
> > From: Ruslan Al-fakikh [mailto:ruslan.al-fakikh@jalent.ru]
> > Sent: 22 декабря 2011 г. 5:38
> > To: user@pig.apache.org
> > Subject: RE: java.lang.OutOfMemoryError when using TOP udf
> >
> > Hey guys
> >
> > I did it according to the advice and moved the TOP execution the map
> > phase and now I am getting the same error, but now it comes from that
> map phase.
> >
> > Any help much appreciated!
> >
> > Here is my current code:
> > https://gist.github.com/1508511
> >
> > Error stack trace:
> > [2011-12-21 08:17:46] FATAL (Child.java:318) - Error running child :
> > java.lang.OutOfMemoryError: Java heap space
> >        at java.io.DataInputStream.readUTF(DataInputStream.java:644)
> >        at java.io.DataInputStream.readUTF(DataInputStream.java:547)
> >        at
> > org.apache.pig.data.BinInterSedes.readCharArray(BinInterSedes.java:210)
> >        at
> > org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:333)
> >        at
> > org.apache.pig.data.BinInterSedes.readDatum(BinInterSedes.java:251)
> >        at
> > org.apache.pig.data.BinInterSedes.addColsToTuple(BinInterSedes.java:555)
> >        at
> > org.apache.pig.data.BinSedesTuple.readFields(BinSedesTuple.java:64)
> >        at
> >
> org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.readFromFile(DefaultDataBag.java:244)
> >        at
> >
> org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.next(DefaultDataBag.java:231)
> >        at
> >
> org.apache.pig.data.DefaultDataBag$DefaultDataBagIterator.hasNext(DefaultDataBag.java:157)
> >        at org.apache.pig.builtin.TOP.updateTop(TOP.java:139)
> >        at org.apache.pig.builtin.TOP.exec(TOP.java:116)
> >        at org.apache.pig.builtin.TOP.exec(TOP.java:65)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:245)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:287)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:338)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:240)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:237)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:232)
> >        at
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
> >        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> >        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
> >        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
> >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> >        at java.security.AccessController.doPrivileged(Native Method)
> >        at javax.security.auth.Subject.doAs(Subject.java:396)
> >        at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
> >        at org.apache.hadoop.mapred.Child.main(Child.java:264)
> >
> > -----Original Message-----
> > From: Dmitriy Ryaboy [mailto:dvryaboy@gmail.com]
> > Sent: 17 декабря 2011 г. 0:16
> > To: user@pig.apache.org
> > Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> >
> > I meant the latter, an actual join statement. So, generate the counts,
> > join them to the original relation, then group again and do TOP.
> >
> > D
> >
> > On Fri, Dec 16, 2011 at 5:32 AM, Ruslan Al-fakikh <
> > ruslan.al-fakikh@jalent.ru> wrote:
> > > Dmitriy,
> > >
> > > You wrote
> > >
> > >> > Ok so this:
> > >> >
> > >> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
> > >> >                                count = COUNT(thirdLevelsSummed);
> > >> >                                result = TOP( (int)(count *
> > >> > (double) ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3,
> > >> > thirdLevelsSummed);
> > >> >                                GENERATE FLATTEN(result); }
> > >> >
> > >> > requires "count" to be calculated before TOP can be applied.
> > >> > Since count can't be calculated until the reduce side, naturally,
> > >> > TOP can't start working on the map side (as it doesn't know its
> > >> > arguments
> > yet). Try generating the counts * ($TLP + $BP) separately, joining
> > them in (I am guessing you have no more than a few K categories -- in
> > that case, you can do a replicated join), and then do group and TOP on.
> > >
> > > Probably I didn't understand your logic correctly. What I did is:
> > > changed this:
> > > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
> > >                                count = COUNT(thirdLevelsSummed);
> > >                                result = TOP( (int)(count * (double)
> > > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3,
> > > thirdLevelsSummed);
> > >                                GENERATE FLATTEN(result); } to this:
> > > thirdLevelsTopNumberCounted = FOREACH thirdLevelsByCategory GENERATE
> > >
> > > group,
> > >
> > > thirdLevelsSummed,
> > >
> > > (int)( COUNT(thirdLevelsSummed) * (double) ($THIRD_LEVELS_PERCENTAGE
> > > +
> > > $BOTS_PERCENTAGE) ) AS TopNumber;
> > >
> > > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsTopNumberCounted
> > > GENERATE
> > >
> > > FLATTEN(TOP(TopNumber, 3, thirdLevelsSummed));
> > >
> > > So I removed the COUNT from the nested group. It didn't help.
> > > Probably
> > you meant the JOIN ... USING 'replicated' statement, but I didn't get
> > how I can apply it here.
> > >
> > > Thanks
> > >
> > > -----Original Message-----
> > > From: Ruslan Al-fakikh [mailto:ruslan.al-fakikh@jalent.ru]
> > > Sent: 24 ноября 2011 г. 15:56
> > > To: user@pig.apache.org
> > > Subject: RE: java.lang.OutOfMemoryError when using TOP udf
> > >
> > > Hm. Interesting. Yeah, I really haven't seen the error after setting
> > mapred.child.java.opts=-Xmx1024m.
> > > Probably I won't have to fix the Pig script:)
> > >
> > > -----Original Message-----
> > > From: Jonathan Coveney [mailto:jcoveney@gmail.com]
> > > Sent: 23 ноября 2011 г. 11:46
> > > To: user@pig.apache.org
> > > Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> > >
> > > I have seen issues with spilling if it had less than 1GB of heap.
> > > Once I
> > allocated enough ram, no issues. It seems unlikely to me that the bag
> > implementation fails on this because it's such a common use and nobody
> > has reported an error, and running with less than 1GB of heap is
> > definitely not recommended. Very curious if the error crops up again.
> > >
> > > 2011/11/22 pablomar <pa...@gmail.com>
> > >
> > >> just a guess .. could it be possible that the Bag is kept in memory
> > >> instead of being spilled to disk ?
> > >> browsing the code of InternalCachedBag, I saw:
> > >>
> > >> private void init(int bagCount, float percent) {
> > >>        factory = TupleFactory.getInstance();
> > >>        mContents = new ArrayList<Tuple> <
> > >> http://javasourcecode.org/html/open-source/jdk/jdk-6u23/java/util/A
> > >> rr
> > >> a
> > >> yList.java.html
> > >> >();
> > >>
> > >>        long max = Runtime.getRuntime().maxMemory();
> > >>        maxMemUsage = (long)(((float)max * percent) /
> > >> (float)bagCount);
> > >>        cacheLimit = Integer.MAX_VALUE;
> > >>
> > >>        // set limit to 0, if memusage is 0 or really really small.
> > >>    // then all tuples are put into disk        if (maxMemUsage < 1) {
> > >>            cacheLimit = 0;
> > >>        }
> > >>
> > >>        addDone = false;
> > >>    }
> > >>
> > >> my guess is the cacheLimit was set to Integer.MAX_VALUE and it's
> > >> trying to keep all in memory when it is not big enough but not so
> > >> small to have cacheLimit reset to 0
> > >>
> > >>
> > >>
> > >>
> > >> On Tue, Nov 22, 2011 at 10:08 AM, Ruslan Al-fakikh <
> > >> ruslan.al-fakikh@jalent.ru> wrote:
> > >>
> > >> > Jonathan,
> > >> >
> > >> > I am running it on Prod cluster in MR mode, not locally. I
> > >> > started to see the issue when input size grew. A few days ago I
> > >> > found a workaround of putting this property:
> > >> > mapred.child.java.opts=-Xmx1024m
> > >> > But I think this is a temporary solution and the job will fail
> > >> > when the input size will grow again.
> > >> >
> > >> > Dmitriy,
> > >> >
> > >> > Thanks a lot for the investigation. I'll try it.
> > >> >
> > >> > -----Original Message-----
> > >> > From: Dmitriy Ryaboy [mailto:dvryaboy@gmail.com]
> > >> > Sent: 22 ноября 2011 г. 2:21
> > >> > To: user@pig.apache.org
> > >> > Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> > >> >
> > >> > Ok so this:
> > >> >
> > >> > thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
> > >> >                                count = COUNT(thirdLevelsSummed);
> > >> >                                result = TOP( (int)(count *
> > >> > (double) ($THIRD_LEVELS_PERCENTAGE +
> > >> > $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed);
> > >> >                                GENERATE FLATTEN(result); }
> > >> >
> > >> > requires "count" to be calculated before TOP can be applied.
> > >> > Since count can't be calculated until the reduce side, naturally,
> > >> > TOP can't start working on the map side (as it doesn't know its
> > >> > arguments
> > yet).
> > >> >
> > >> > Try generating the counts * ($TLP + $BP) separately, joining them
> > >> > in (I
> > >> am
> > >> > guessing you have no more than a few K categories -- in that
> > >> > case, you
> > >> can
> > >> > do a replicated join), and then do group and TOP on.
> > >> >
> > >> > On Mon, Nov 21, 2011 at 1:53 PM, Jonathan Coveney
> > >> > <jc...@gmail.com>
> > >> > wrote:
> > >> > > You're right pablomar...hmm
> > >> > >
> > >> > > Ruslan: are you running this in mr mode on a cluster, or locally?
> > >> > >
> > >> > > I'm noticing this:
> > >> > > [2011-11-16 12:34:55] INFO  (SpillableMemoryManager.java:154) -
> > >> > > first memory handler call- Usage threshold init =
> > >> > > 175308800(171200K) used =
> > >> > > 373454552(364701K) committed = 524288000(512000K) max =
> > >> > > 524288000(512000K)
> > >> > >
> > >> > > It looks like your max memory is 512MB. I've had issues with
> > >> > > bag spilling with less than 1GB allocated (-Xmx1024mb).
> > >> > >
> > >> > > 2011/11/21 pablomar <pa...@gmail.com>
> > >> > >
> > >> > >> i might be wrong, but it seems the error comes from
> > >> > >> while(itr.hasNext())
> > >> > >> not from the add to the queue
> > >> > >> so i don't think it is related to the number of elements in
> > >> > >> the queue ... maybe the field lenght?
> > >> > >>
> > >> > >> On 11/21/11, Jonathan Coveney <jc...@gmail.com> wrote:
> > >> > >> > Internally, TOP is using a priority queue. It tries to be
> > >> > >> > smart about pulling off excess elements, but if you ask it
> > >> > >> > for enough elements, it
> > >> > >> can
> > >> > >> > blow up, because the priority queue is going to have n
> > >> > >> > elements, where n
> > >> > >> is
> > >> > >> > the ranking you want. This is consistent with the stack
> > >> > >> > trace, which died on updateTop which is when elements are
> > >> > >> > added to the
> > >> > priority queue.
> > >> > >> >
> > >> > >> > Ruslan, how large are the limits you're setting? ie
> > >> > >> > (int)(count
> > >> > >> > *
> > >> > >> (double)
> > >> > >> > ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) )
> > >> > >> >
> > >> > >> > As far as TOP's implementation, I imagine you could get
> > >> > >> > around the issue
> > >> > >> by
> > >> > >> > using a sorted data bag, but that might be much slower. hmm.
> > >> > >> >
> > >> > >> > 2011/11/21 Ruslan Al-fakikh <ru...@jalent.ru>
> > >> > >> >
> > >> > >> >> Ok. Here it is:
> > >> > >> >> https://gist.github.com/1383266
> > >> > >> >>
> > >> > >> >> -----Original Message-----
> > >> > >> >> From: Dmitriy Ryaboy [mailto:dvryaboy@gmail.com]
> > >> > >> >> Sent: 21 ноября 2011 г. 20:32
> > >> > >> >> To: user@pig.apache.org
> > >> > >> >> Subject: Re: java.lang.OutOfMemoryError when using TOP udf
> > >> > >> >>
> > >> > >> >> Ruslan, I think the mailing list is set to reject
> > >> > >> >> attachments
> > >> > >> >> -- can you post it as a github gist or something similar,
> > >> > >> >> and send a
> > >> > link?
> > >> > >> >>
> > >> > >> >> D
> > >> > >> >>
> > >> > >> >> On Mon, Nov 21, 2011 at 6:11 AM, Ruslan Al-Fakikh
> > >> > >> >> <ru...@jalent.ru> wrote:
> > >> > >> >> > Hey Dmitriy,
> > >> > >> >> >
> > >> > >> >> > I attached the script. It is not a plain-pig script,
> > >> > >> >> > because I make some preprocessing before submitting it to
> > >> > >> >> > cluster, but the general idea of what I submit is clear.
> > >> > >> >> >
> > >> > >> >> > Thanks in advance!
> > >> > >> >> >
> > >> > >> >> > On Fri, Nov 18, 2011 at 12:07 AM, Dmitriy Ryaboy
> > >> > >> >> > <dv...@gmail.com>
> > >> > >> >> wrote:
> > >> > >> >> >> Ok, so it's something in the rest of the script that's
> > >> > >> >> >> causing this to happen. Ruslan, if you send your script,
> > >> > >> >> >> I can probably figure out why (usually, it's using
> > >> > >> >> >> another, non-agebraic udf in your foreach, or for pig
> > >> > >> >> >> 0.8, generating a constant in the
> > >> > foreach).
> > >> > >> >> >>
> > >> > >> >> >> D
> > >> > >> >> >>
> > >> > >> >> >> On Thu, Nov 17, 2011 at 9:59 AM, pablomar
> > >> > >> >> >> <pa...@gmail.com> wrote:
> > >> > >> >> >>> according to the stack trace, the algebraic is not
> > >> > >> >> >>> being used it says
> > >> > >> >> >>> updateTop(Top.java:139)
> > >> > >> >> >>> exec(Top.java:116)
> > >> > >> >> >>>
> > >> > >> >> >>> On 11/17/11, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> > >> > >> >> >>>> The top udf does not try to process all data in memory
> > >> > >> >> >>>> if the algebraic optimization can be applied. It does
> > >> > >> >> >>>> need to keep the topn numbers in memory of course. Can
> > >> > >> >> >>>> you confirm algebraic mode is
> > >> > >> >> used?
> > >> > >> >> >>>>
> > >> > >> >> >>>> On Nov 17, 2011, at 6:13 AM, "Ruslan Al-fakikh"
> > >> > >> >> >>>> <ru...@jalent.ru>
> > >> > >> >> >>>> wrote:
> > >> > >> >> >>>>
> > >> > >> >> >>>>> Hey guys,
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> I encounter java.lang.OutOfMemoryError when using TOP
> udf.
> > >> > >> >> >>>>> It seems that the udf tries to process all data in
> memory.
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> Is there a workaround for TOP? Or maybe there is some
> > >> > >> >> >>>>> other way of getting top results? I cannot use LIMIT
> > >> > >> >> >>>>> since I need to 5% of data, not a constant number of
> rows.
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> I am using:
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> Apache Pig version 0.8.1-cdh3u2 (rexported)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> The stack trace is:
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> [2011-11-16 12:34:55] INFO  (CodecPool.java:128) -
> > >> > >> >> >>>>> Got brand-new decompressor
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> [2011-11-16 12:34:55] INFO  (Merger.java:473) - Down
> > >> > >> >> >>>>> to the last merge-pass, with 21 segments left of total
> size:
> > >> > >> >> >>>>> 2057257173 bytes
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> [2011-11-16 12:34:55] INFO
> > >> > >> >> >>>>> (SpillableMemoryManager.java:154) - first memory
> > >> > >> >> >>>>> handler
> > >> > >> >> >>>>> call- Usage threshold init =
> > >> > >> >> >>>>> 175308800(171200K) used =
> > >> > >> >> >>>>> 373454552(364701K) committed = 524288000(512000K) max
> > >> > >> >> >>>>> =
> > >> > >> >> >>>>> 524288000(512000K)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> [2011-11-16 12:36:22] INFO
> > >> > >> >> >>>>> (SpillableMemoryManager.java:167) - first memory
> > >> > >> >> >>>>> handler call - Collection threshold init =
> > >> > >> >> >>>>> 175308800(171200K) used =
> > >> > >> >> >>>>> 496500704(484863K) committed = 524288000(512000K) max
> > >> > >> >> >>>>> =
> > >> > >> >> >>>>> 524288000(512000K)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> [2011-11-16 12:37:28] INFO
> > >> > >> >> >>>>> (TaskLogsTruncater.java:69)
> > >> > >> >> >>>>> - Initializing logs'
> > >> > >> >> >>>>> truncater with mapRetainSize=-1 and
> > >> > >> >> >>>>> reduceRetainSize=-1
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> [2011-11-16 12:37:28] FATAL (Child.java:318) - Error
> > >> > >> >> >>>>> running
> > >> > >> child :
> > >> > >> >> >>>>> java.lang.OutOfMemoryError: Java heap space
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> java.util.Arrays.copyOfRange(Arrays.java:3209)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> java.lang.String.<init>(String.java:215)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:
> > >> > >> >> >>>>> 64
> > >> > >> >> >>>>> 4
> > >> > >> >> >>>>> )
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> java.io.DataInputStream.readUTF(DataInputStream.java:
> > >> > >> >> >>>>> 54
> > >> > >> >> >>>>> 7
> > >> > >> >> >>>>> )
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readCharArray(BinIn
> > >> > >> >> >>>>> te
> > >> > >> >> >>>>> r
> > >> > >> >> >>>>> Sede
> > >> > >> >> >>>>> s.java
> > >> > >> >> >>>>> :210)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterS
> > >> > >> >> >>>>> ed
> > >> > >> >> >>>>> e
> > >> > >> >> >>>>> s.ja
> > >> > >> >> >>>>> va:333
> > >> > >> >> >>>>> )
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.readDatum(BinInterS
> > >> > >> >> >>>>> ed
> > >> > >> >> >>>>> e
> > >> > >> >> >>>>> s.ja
> > >> > >> >> >>>>> va:251
> > >> > >> >> >>>>> )
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.data.BinInterSedes.addColsToTuple(BinI
> > >> > >> >> >>>>> nt
> > >> > >> >> >>>>> e
> > >> > >> >> >>>>> rSed
> > >> > >> >> >>>>> es.jav
> > >> > >> >> >>>>> a:555)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.data.BinSedesTuple.readFields(BinSedes
> > >> > >> >> >>>>> Tu
> > >> > >> >> >>>>> p
> > >> > >> >> >>>>> le.j
> > >> > >> >> >>>>> ava:64
> > >> > >> >> >>>>> )
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.data.InternalCachedBag$CachedBagIterator.
> > >> > >> >> >>>>> hasN
> > >> > >> >> >>>>> ext(In
> > >> > >> >> >>>>> ternalCach
> > >> > >> >> >>>>> edBag.java:237)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.builtin.TOP.updateTop(TOP.java:139)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:116)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.builtin.TOP.exec(TOP.java:65)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> > >> > >> >> >>>>> expres
> > >> > >> >> >>>>> sionOperat
> > >> > >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:245)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> > >> > >> >> >>>>> expres
> > >> > >> >> >>>>> sionOperat
> > >> > >> >> >>>>> ors.POUserFunc.getNext(POUserFunc.java:287)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> > >> > >> >> >>>>> relati
> > >> > >> >> >>>>> onalOperat
> > >> > >> >> >>>>> ors.POForEach.processPlan(POForEach.java:338)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> > >> > >> >> >>>>> relati
> > >> > >> >> >>>>> onalOperat
> > >> > >> >> >>>>> ors.POForEach.getNext(POForEach.java:290)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> > >> > >> >> >>>>> Physic
> > >> > >> >> >>>>> alOperator
> > >> > >> >> >>>>> .processInput(PhysicalOperator.java:276)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > org.apache.pig.backend.hadoop.executionengine.physicalLayer.
> > >> > >> >> >>>>> relati
> > >> > >> >> >>>>> onalOperat
> > >> > >> >> >>>>> ors.POForEach.getNext(POForEach.java:240)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> > >> > >> >> >>>>> ce
> > >> > >> >> >>>>> L
> > >> > >> >> >>>>> ayer
> > >> > >> >> >>>>> .PigMa
> > >> > >> >> >>>>> pReduce$Re
> > >> > >> >> >>>>> duce.runPipeline(PigMapReduce.java:434)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> > >> > >> >> >>>>> ce
> > >> > >> >> >>>>> L
> > >> > >> >> >>>>> ayer
> > >> > >> >> >>>>> .PigMa
> > >> > >> >> >>>>> pReduce$Re
> > >> > >> >> >>>>> duce.processOnePackageOutput(PigMapReduce.java:402)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> > >> > >> >> >>>>> ce
> > >> > >> >> >>>>> L
> > >> > >> >> >>>>> ayer
> > >> > >> >> >>>>> .PigMa
> > >> > >> >> >>>>> pReduce$Re
> > >> > >> >> >>>>> duce.reduce(PigMapReduce.java:382)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.pig.backend.hadoop.executionengine.mapRedu
> > >> > >> >> >>>>> ce
> > >> > >> >> >>>>> L
> > >> > >> >> >>>>> ayer
> > >> > >> >> >>>>> .PigMa
> > >> > >> >> >>>>> pReduce$Re
> > >> > >> >> >>>>> duce.reduce(PigMapReduce.java:251)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:
> > >> > >> >> >>>>> 17
> > >> > >> >> >>>>> 6
> > >> > >> >> >>>>> )
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > >> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:
> > >> > >> >> >>>>> 572)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:
> > >> > >> >> >>>>> 414)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> java.security.AccessController.doPrivileged(Native
> > >> > >> >> >>>>> Method)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> javax.security.auth.Subject.doAs(Subject.java:396)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
> > >> > >> >> >>>>> Us
> > >> > >> >> >>>>> e
> > >> > >> >> >>>>> rGro
> > >> > >> >> >>>>> upInfo
> > >> > >> >> >>>>> rmation.ja
> > >> > >> >> >>>>> va:1127)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> org.apache.hadoop.mapred.Child.main(Child.java:264)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> stderr logs
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> Exception in thread "Low Memory Detector"
> > >> > >> >> >>>>> java.lang.OutOfMemoryError: Java heap space
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.getCompos
> > >> > >> >> >>>>> it
> > >> > >> >> >>>>> e
> > >> > >> >> >>>>> Data
> > >> > >> >> >>>>> (Memor
> > >> > >> >> >>>>> yNotifInfo
> > >> > >> >> >>>>> CompositeData.java:42)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> sun.management.MemoryNotifInfoCompositeData.toComposi
> > >> > >> >> >>>>> te
> > >> > >> >> >>>>> D
> > >> > >> >> >>>>> ata(
> > >> > >> >> >>>>> Memory
> > >> > >> >> >>>>> NotifInfoC
> > >> > >> >> >>>>> ompositeData.java:36)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> sun.management.MemoryImpl.createNotification(MemoryImpl.
> > >> > >> >> >>>>> java
> > >> > >> >> >>>>> :168)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>>
> > >> > >> >>
> > >> > >> >>
> > >> > >>
> > >> >
> > >>
> >
> sun.management.MemoryPoolImpl$CollectionSensor.triggerAction(MemoryPoolImpl.
> > >> > >> >> >>>>> java:300)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>                at
> > >> > >> >> >>>>> sun.management.Sensor.trigger(Sensor.java:120)
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>>
> > >> > >> >> >>>>> Thanks in advance!
> > >> > >> >> >>>>>
> > >> > >> >> >>>>
> > >> > >> >> >>>
> > >> > >> >> >>
> > >> > >> >> >
> > >> > >> >> >
> > >> > >> >> >
> > >> > >> >> > --
> > >> > >> >> > Best Regards,
> > >> > >> >> > Ruslan Al-Fakikh
> > >> > >> >> >
> > >> > >> >>
> > >> > >> >>
> > >> > >> >
> > >> > >>
> > >> > >
> > >> >
> > >> >
> > >>
> > >
> > >
> >
> >
> >
>
>