You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Sheng Guo <en...@gmail.com> on 2012/07/02 20:42:28 UTC

What is the best way to do counting in pig?

Hi all,

I used to use the following pig script to do the counting of the records.

m_skill_group = group m_skills_filter by member_id;
grpd = group m_skill_group all;
cnt = foreach grpd generate COUNT(m_skill_group);

cnt_filter = limit cnt 10;
dump cnt_filter;


but sometimes, when the records get larger, it takes lots of time and hang
up, and or die.
I thought counting should be simple enough, so what is the best way to do a
counting in pig?

Thanks!

Sheng

Re: What is the best way to do counting in pig?

Posted by Jonathan Coveney <jc...@gmail.com>.
The code you posted should be performant. a group all -> count is quite
fast, so my guess is that there is something else going on. can you paste
your whole script?

2012/7/2 Sheng Guo <en...@gmail.com>

> No. I try to figure out how many records (rows) in 'm_skill_group' table.
> (That limit statement actually is not necessary)
>
> Thanks!
>
>
> On Mon, Jul 2, 2012 at 1:20 PM, Jonathan Coveney <jc...@gmail.com>
> wrote:
>
> > Is your goal to have the 10 largest rows by member_id?
> >
> > 2012/7/2 Sheng Guo <en...@gmail.com>
> >
> > > Hi all,
> > >
> > > I used to use the following pig script to do the counting of the
> records.
> > >
> > > m_skill_group = group m_skills_filter by member_id;
> > > grpd = group m_skill_group all;
> > > cnt = foreach grpd generate COUNT(m_skill_group);
> > >
> > > cnt_filter = limit cnt 10;
> > > dump cnt_filter;
> > >
> > >
> > > but sometimes, when the records get larger, it takes lots of time and
> > hang
> > > up, and or die.
> > > I thought counting should be simple enough, so what is the best way to
> > do a
> > > counting in pig?
> > >
> > > Thanks!
> > >
> > > Sheng
> > >
> >
>

Re: What is the best way to do counting in pig?

Posted by Sheng Guo <en...@gmail.com>.
No. I try to figure out how many records (rows) in 'm_skill_group' table.
(That limit statement actually is not necessary)

Thanks!


On Mon, Jul 2, 2012 at 1:20 PM, Jonathan Coveney <jc...@gmail.com> wrote:

> Is your goal to have the 10 largest rows by member_id?
>
> 2012/7/2 Sheng Guo <en...@gmail.com>
>
> > Hi all,
> >
> > I used to use the following pig script to do the counting of the records.
> >
> > m_skill_group = group m_skills_filter by member_id;
> > grpd = group m_skill_group all;
> > cnt = foreach grpd generate COUNT(m_skill_group);
> >
> > cnt_filter = limit cnt 10;
> > dump cnt_filter;
> >
> >
> > but sometimes, when the records get larger, it takes lots of time and
> hang
> > up, and or die.
> > I thought counting should be simple enough, so what is the best way to
> do a
> > counting in pig?
> >
> > Thanks!
> >
> > Sheng
> >
>

Re: What is the best way to do counting in pig?

Posted by Jonathan Coveney <jc...@gmail.com>.
Is your goal to have the 10 largest rows by member_id?

2012/7/2 Sheng Guo <en...@gmail.com>

> Hi all,
>
> I used to use the following pig script to do the counting of the records.
>
> m_skill_group = group m_skills_filter by member_id;
> grpd = group m_skill_group all;
> cnt = foreach grpd generate COUNT(m_skill_group);
>
> cnt_filter = limit cnt 10;
> dump cnt_filter;
>
>
> but sometimes, when the records get larger, it takes lots of time and hang
> up, and or die.
> I thought counting should be simple enough, so what is the best way to do a
> counting in pig?
>
> Thanks!
>
> Sheng
>

Re: What is the best way to do counting in pig?

Posted by Sheng Guo <en...@gmail.com>.
I guess that's the reason, using single reducer may cause some problem when
the data is huge, the counting is very time-consuming or even die at the
end.

What do you mean by counting star to null fileds? can you explain a little
more on this? what is the difference between this one and the standard one
in terms of job execution?

Thanks!

On Mon, Jul 2, 2012 at 1:51 PM, Subir S <su...@gmail.com> wrote:

> Group all - uses single reducer AFAIU. You can try to count per group
> and sum may be.
>
> You may also try with COUNT_STAR to include NULL fields.
>
> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
> > Hi all,
> >
> > I used to use the following pig script to do the counting of the records.
> >
> > m_skill_group = group m_skills_filter by member_id;
> > grpd = group m_skill_group all;
> > cnt = foreach grpd generate COUNT(m_skill_group);
> >
> > cnt_filter = limit cnt 10;
> > dump cnt_filter;
> >
> >
> > but sometimes, when the records get larger, it takes lots of time and
> hang
> > up, and or die.
> > I thought counting should be simple enough, so what is the best way to
> do a
> > counting in pig?
> >
> > Thanks!
> >
> > Sheng
> >
>

Re: What is the best way to do counting in pig?

Posted by Jonathan Coveney <jc...@gmail.com>.
I agree that more documentation around parameters would be good, and force
some consistency. I saw you made one ticket, can you make a ticket for that
as well?

Good find :)

2012/7/10 Haitao Yao <ya...@gmail.com>

> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because
> every time the InternalCachedBag spills, It creates a new tmp file in
> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple
> added into InternalCachedBag will create a new tmp file. And the tmp file
> is deleted on exit.
> So , if you're unlucky like me, you will get a OOM Exception caused by
> java.io.DeleteOnExitHook!
> Here's the evidence:
>
> God, we really need a full description of how every parameter works.
>
>
>
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-10,下午4:20, Haitao Yao 写道:
>
> I found the solution.
>
> After analyzing the heap dump while the reducer OOM, I found out the
> memory is consumed by org.apache.pig.data.InternalCachedBag , here's the
> diagram:
> <cc.jpg>
>
> In the source code of org.apache.pig.data.InternalCachedBag, I found out
> there's a parameter for the cache limit:
>  *public* InternalCachedBag(*int* bagCount) {
>         *float* percent = 0.2F;
>
>     *if* (PigMapReduce.*sJobConfInternal*.get() != *null*) {
> // here, the cache limit is from here!
>     String usage = PigMapReduce.*sJobConfInternal*.get().get(
> "pig.cachedbag.memusage");
>     *if* (usage != *null*) {
>     percent = Float.*parseFloat*(usage);
>     }
>     }
>
>         init(bagCount, percent);
>     }
>     *private* *void* init(*int* bagCount, *float* percent) {
>     factory = TupleFactory.*getInstance*();
>     mContents = *new* ArrayList<Tuple>();
>
>     *long* max = Runtime.*getRuntime*().maxMemory();
>         maxMemUsage = (*long*)(((*float*)max * percent) / (*float*
> )bagCount);
>         cacheLimit = Integer.*MAX_VALUE*;
>
>         // set limit to 0, if memusage is 0 or really really small.
>         // then all tuples are put into disk
>         *if* (maxMemUsage < 1) {
>         cacheLimit = 0;
>         }
>         *log*.warn("cacheLimit: " + *this*.cacheLimit);
>         addDone = *false*;
>     }
>
> so, after write pig.cachedbag.memusage=0 into
> $PIG_HOME/conf/pig.properties, my job successes!
>
> You can also set to an appropriate value to fully utilize your memory as a
> cache.
>
> Hope this is useful for others.
> Thanks.
>
>
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-10,下午1:06, Haitao Yao 写道:
>
> my reducers get 512 MB, -Xms512M -Xmx512M.
> The reducer does not get OOM when manually invoke spill in my case.
>
> Can you explain more about your solution?
> And can your solution fit into 512MB reducer process?
> Thanks very much.
>
>
>
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>
> I have something in the mix that should reduce bag memory :) Question: how
> much memory are your reducers getting? In my experience, you'll get OOM's
> on spilling if you have allocated less than a gig to the JVM
>
> 2012/7/9 Haitao Yao <ya...@gmail.com>
>
> I have encountered the similar problem.  And I got a OOM while running the
>
> reducer.
>
> I think the reason is the data bag generated after group all is too big to
>
> fit into the reducer's memory.
>
>
> and I have written a new COUNT implementation with explicit invoke
>
> System.gc() and spill  after the COUNT function finish its job, but it
>
> still get OOM
>
>
> here's the code of the new COUNT implementation:
>
>        @Override
>
>        public Long exec(Tuple input) throws IOException {
>
>                DataBag bag = (DataBag)input.get(0);
>
>                Long result = super.exec(input);
>
>                LOG.warn(" before spill data bag memory : " +
>
> Runtime.getRuntime().freeMemory());
>
>                bag.spill();
>
>                System.gc();
>
>                LOG.warn(" after spill data bag memory : " +
>
> Runtime.getRuntime().freeMemory());
>
>                LOG.warn("big bag size: " + bag.size() + ", hashcode: " +
>
> bag.hashCode());
>
>                return result;
>
>        }
>
>
>
> I think we have to redesign the data bag implementation with less memory
>
> consumed.
>
>
>
>
> Haitao Yao
>
> yao.erix@gmail.com
>
> weibo: @haitao_yao
>
> Skype:  haitao.yao.final
>
>
> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>
>
> the pig script:
>
>
> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>
>
> grpall = group longDesc all;
>
> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>
> explain cnt;
>
>
>
> the dump relation result:
>
>
> #-----------------------------------------------
>
> # New Logical Plan:
>
> #-----------------------------------------------
>
> cnt: (Name: LOStore Schema: allNumber#65:long)
>
> |
>
> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>
>   |   |
>
>   |   (Name: LOGenerate[false] Schema:
>
> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>
>   |   |   |
>
>   |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid:
>
> 65)
>
>   |   |   |
>
>   |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 Column:
>
> (*))
>
>   |   |
>
>   |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>
>
>
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>
>   |
>
>   |---grpall: (Name: LOCogroup Schema:
>
>
>
> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>
>       |   |
>
>       |   (Name: Constant Type: chararray Uid: 62)
>
>       |
>
>       |---longDesc: (Name: LOLoad Schema:
>
>
>
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>
>
> #-----------------------------------------------
>
> # Physical Plan:
>
> #-----------------------------------------------
>
> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>
> |
>
> |---cnt: New For Each(false)[bag] - scope-8
>
>   |   |
>
>   |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>
>   |   |
>
>   |   |---Project[bag][1] - scope-5
>
>   |
>
>   |---grpall: Package[tuple]{chararray} - scope-2
>
>       |
>
>       |---grpall: Global Rearrange[tuple] - scope-1
>
>           |
>
>           |---grpall: Local Rearrange[tuple]{chararray}(false) - scope-3
>
>               |   |
>
>               |   Constant(all) - scope-4
>
>               |
>
>               |---longDesc:
>
> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>
>
> 2012-07-09 15:47:02,441 [main] INFO
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
>
> File concatenation threshold: 100 optimistic? false
>
> 2012-07-09 15:47:02,448 [main] INFO
>
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>
> - Choosing to move algebraic foreach to combiner
>
> 2012-07-09 15:47:02,581 [main] INFO
>
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>
> - MR plan size before optimization: 1
>
> 2012-07-09 15:47:02,581 [main] INFO
>
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>
> - MR plan size after optimization: 1
>
> #--------------------------------------------------
>
> # Map Reduce Plan
>
> #--------------------------------------------------
>
> MapReduce node scope-10
>
> Map Plan
>
> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>
> |   |
>
> |   Project[chararray][0] - scope-23
>
> |
>
> |---cnt: New For Each(false,false)[bag] - scope-11
>
>   |   |
>
>   |   Project[chararray][0] - scope-12
>
>   |   |
>
>   |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
>
>   |   |
>
>   |   |---Project[bag][1] - scope-14
>
>   |
>
>   |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>
>       |
>
>       |---longDesc:
>
> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0--------
>
> Combine Plan
>
> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>
> |   |
>
> |   Project[chararray][0] - scope-27
>
> |
>
> |---cnt: New For Each(false,false)[bag] - scope-15
>
>   |   |
>
>   |   Project[chararray][0] - scope-16
>
>   |   |
>
>   |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>
> scope-17
>
>   |   |
>
>   |   |---Project[bag][1] - scope-18
>
>   |
>
>   |---POCombinerPackage[tuple]{chararray} - scope-20--------
>
> Reduce Plan
>
> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>
> |
>
> |---cnt: New For Each(false)[bag] - scope-8
>
>   |   |
>
>   |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>
>   |   |
>
>   |   |---Project[bag][1] - scope-19
>
>   |
>
>   |---POCombinerPackage[tuple]{chararray} - scope-28--------
>
> Global sort: false
>
> ----------------
>
>
>
>
> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney <jc...@gmail.com>
>
> wrote:
>
>
> instead of doing "dump relation," do "explain relation" (then run
>
> identically) and paste the output here. It will show whether the
>
> combiner
>
> is being used,
>
>
> 2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>
>
>
> Hi,
>
>
> As it was said, COUNT is algebraic and should be fast, because it
>
> forces combiner. You should make sure that combiner is really used
>
> here. It can be disabled in some situations. I've encountered such
>
> situations many times when a job is tooo heavy in case no combiner is
>
> applied.
>
>
> Ruslan
>
>
> On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com>
>
> wrote:
>
> Right!!
>
>
> Since it is mentioned that job is hanging, wild guess is it must be
>
> 'group all'. How can that be confirmed?
>
>
> On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
>
> group all uses a single reducer, but COUNT is algebraic, and as such,
>
> will
>
> use combiners, so it is generally quite fast.
>
>
> 2012/7/2 Subir S <su...@gmail.com>
>
>
> Group all - uses single reducer AFAIU. You can try to count per
>
> group
>
> and sum may be.
>
>
> You may also try with COUNT_STAR to include NULL fields.
>
>
> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
>
> Hi all,
>
>
> I used to use the following pig script to do the counting of the
>
> records.
>
>
> m_skill_group = group m_skills_filter by member_id;
>
> grpd = group m_skill_group all;
>
> cnt = foreach grpd generate COUNT(m_skill_group);
>
>
> cnt_filter = limit cnt 10;
>
> dump cnt_filter;
>
>
>
> but sometimes, when the records get larger, it takes lots of time
>
> and
>
> hang
>
> up, and or die.
>
> I thought counting should be simple enough, so what is the best way
>
> to
>
> do a
>
> counting in pig?
>
>
> Thanks!
>
>
> Sheng
>
>
>
>
>
>
>
> --
>
> Best Regards,
>
> Ruslan Al-Fakikh
>
>
>
>
>
>
>
>
>

Re: What is the best way to do counting in pig?

Posted by Haitao Yao <ya...@gmail.com>.
Hey, all, I've submitted the patch for PIG-2182, here's the link: https://issues.apache.org/jira/browse/PIG-2812

I didn't change the data bags to spill into only one file, since that's a very big modification. I just let the DefaultAbstractDataBag spill into one directory and delete the directory recursively with ShutdownHook.




Haitao Yao
yao.erix@gmail.com
weibo: @haitao_yao
Skype:  haitao.yao.final

在 2012-7-12,上午11:04, Haitao Yao 写道:

> Sorry. here's the full mail. 
> 
> 
> > Is your query using combiner ?
> 	I did know how to explicitly use combiner.
> 
> > Can you send the explain plan output ?
> 	The explain result is in the attachment. It's a little long. link: http://pastebin.com/Q6CvKiP1
> 	
> <aa.explain>
> 
> 
> > Does the heap information say how many entries are there in the
> InteralCachedBag's ArrayList ?
> 	There's 6 big Array lists, and the size is about 372692
> 	Here's the screen snapshot of the heap dump:
> 
> 	screen snapshot 1: you can see there's 6 big POForeEach instances
> 	
> <aa.jpg>
> 
> 
> 		screen snapshot 2: you can see the memory are mostly retained by the big array list.
> 		
> <bb.jpg>
> 
> 
> 		screen snapshot 3: you can see the big array list is referenced by InternalCachedBag: 
> 			
> <cc.jpg>
> 
> 	
> > What version of pig are you using?
> 	pig-0.9.2, I've read the latest source code of pig from github, and I don't find any improvements on IntercalCachedBag
> 
> 
> 
> 在 2012-7-12,上午10:58, Jonathan Coveney 写道:
> 
>> the listserv strips attachments. you'll have to host it somewhere else and
>> link it
>> 
>> 2012/7/11 Haitao Yao <ya...@gmail.com>
>> 
>>> Sorry , I sent the mail only to Thejas.
>>> 
>>> Resend it for all.
>>> 
>>> 
>>> Haitao Yao
>>> yao.erix@gmail.com
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>> 
>>> 在 2012-7-12,上午10:41, Haitao Yao 写道:
>>> 
>>>> 
>>>> 
>>>>> Is your query using combiner ?
>>>>      I did know how to explicitly use combiner.
>>>> 
>>>>> Can you send the explain plan output ?
>>>>      The explain result is in the attachment. It's a little long.
>>>> 
>>>> <aa.explain>
>>>> 
>>>>> Does the heap information say how many entries are there in the
>>>> InteralCachedBag's ArrayList ?
>>>>      There's 6 big Array lists, and the size is about 372692
>>>>      Here's the screen snapshot of the heap dump:
>>>> 
>>>>      screen snapshot 1: you can see there's 6 big POForeEach instances
>>>> 
>>>> <aa.jpg>
>>>> 
>>>>              screen snapshot 2: you can see the memory are mostly
>>> retained by the big array list.
>>>> 
>>>> <bb.jpg>
>>>> 
>>>>              screen snapshot 3: you can see the big array list is
>>> referenced by InternalCachedBag:
>>>> 
>>>> <cc.jpg>
>>>> 
>>>>> What version of pig are you using?
>>>>      pig-0.9.2, I've read the latest source code of pig from github,
>>> and I don't find any improvements on IntercalCachedBag.
>>>> 
>>>> 
>>>> Haitao Yao
>>>> yao.erix@gmail.com
>>>> weibo: @haitao_yao
>>>> Skype:  haitao.yao.final
>>>> 
>>>> 在 2012-7-12,上午8:56, Thejas Nair 写道:
>>>> 
>>>>> Haitao,
>>>>> Is your query using combiner ? Can you send the explain plan output ?
>>>>> Does the heap information say how many entries are there in the
>>>>> InteralCachedBag's ArrayList ?
>>>>> What version of pig are you using ?
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Thejas
>>>>> 
>>>>> 
>>>>> On 7/10/12 11:50 PM, Haitao Yao wrote:
>>>>>> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because
>>>>>> every time the InternalCachedBag spills, It creates a new tmp file in
>>>>>> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple
>>>>>> added into InternalCachedBag will create a new tmp file. And the tmp
>>>>>> file is deleted on exit.
>>>>>> So , if you're unlucky like me, you will get a OOM Exception caused by
>>>>>> java.io.DeleteOnExitHook!
>>>>>> Here's the evidence:
>>>>>> 
>>>>>> God, we really need a full description of how every parameter works.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Haitao Yao
>>>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>>>> weibo: @haitao_yao
>>>>>> Skype:  haitao.yao.final
>>>>>> 
>>>>>> 在 2012-7-10,下午4:20, Haitao Yao 写道:
>>>>>> 
>>>>>>> I found the solution.
>>>>>>> 
>>>>>>> After analyzing the heap dump while the reducer OOM, I found out the
>>>>>>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's
>>>>>>> the diagram:
>>>>>>> <cc.jpg>
>>>>>>> 
>>>>>>> In the source code of org.apache.pig.data.InternalCachedBag, I found
>>>>>>> out there's a parameter for the cache limit:
>>>>>>> *public* InternalCachedBag(*int* bagCount) {
>>>>>>> *float* percent = 0.2F;
>>>>>>> 
>>>>>>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
>>>>>>> // here, the cache limit is from here!
>>>>>>> String usage =
>>>>>>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
>>>>>>> *if* (usage != *null*) {
>>>>>>> percent = Float./parseFloat/(usage);
>>>>>>> }
>>>>>>> }
>>>>>>> 
>>>>>>>       init(bagCount, percent);
>>>>>>>   }
>>>>>>> *private* *void* init(*int* bagCount, *float* percent) {
>>>>>>> factory = TupleFactory./getInstance/();
>>>>>>> mContents = *new* ArrayList<Tuple>();
>>>>>>> 
>>>>>>> *long* max = Runtime./getRuntime/().maxMemory();
>>>>>>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
>>>>>>> cacheLimit = Integer./MAX_VALUE/;
>>>>>>> 
>>>>>>> // set limit to 0, if memusage is 0 or really really small.
>>>>>>> // then all tuples are put into disk
>>>>>>> *if* (maxMemUsage < 1) {
>>>>>>> cacheLimit = 0;
>>>>>>>       }
>>>>>>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
>>>>>>> addDone = *false*;
>>>>>>>   }
>>>>>>> 
>>>>>>> so, after write pig.cachedbag.memusage=0 into
>>>>>>> $PIG_HOME/conf/pig.properties, my job successes!
>>>>>>> 
>>>>>>> You can also set to an appropriate value to fully utilize your memory
>>>>>>> as a cache.
>>>>>>> 
>>>>>>> Hope this is useful for others.
>>>>>>> Thanks.
>>>>>>> 
>>>>>>> 
>>>>>>> Haitao Yao
>>>>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>>>>> weibo: @haitao_yao
>>>>>>> Skype:  haitao.yao.final
>>>>>>> 
>>>>>>> 在 2012-7-10,下午1:06, Haitao Yao 写道:
>>>>>>> 
>>>>>>>> my reducers get 512 MB, -Xms512M -Xmx512M.
>>>>>>>> The reducer does not get OOM when manually invoke spill in my case.
>>>>>>>> 
>>>>>>>> Can you explain more about your solution?
>>>>>>>> And can your solution fit into 512MB reducer process?
>>>>>>>> Thanks very much.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Haitao Yao
>>>>>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>>>>>> weibo: @haitao_yao
>>>>>>>> Skype:  haitao.yao.final
>>>>>>>> 
>>>>>>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>>>>>>>> 
>>>>>>>>> I have something in the mix that should reduce bag memory :)
>>>>>>>>> Question: how
>>>>>>>>> much memory are your reducers getting? In my experience, you'll get
>>>>>>>>> OOM's
>>>>>>>>> on spilling if you have allocated less than a gig to the JVM
>>>>>>>>> 
>>>>>>>>> 2012/7/9 Haitao Yao <yao.erix@gmail.com <mailto:yao.erix@gmail.com
>>>>> 
>>>>>>>>> 
>>>>>>>>>> I have encountered the similar problem.  And I got a OOM while
>>>>>>>>>> running the
>>>>>>>>>> reducer.
>>>>>>>>>> I think the reason is the data bag generated after group all is too
>>>>>>>>>> big to
>>>>>>>>>> fit into the reducer's memory.
>>>>>>>>>> 
>>>>>>>>>> and I have written a new COUNT implementation with explicit invoke
>>>>>>>>>> System.gc() and spill  after the COUNT function finish its job,
>>> but it
>>>>>>>>>> still get OOM
>>>>>>>>>> 
>>>>>>>>>> here's the code of the new COUNT implementation:
>>>>>>>>>>      @Override
>>>>>>>>>>      public Long exec(Tuple input) throws IOException {
>>>>>>>>>>              DataBag bag = (DataBag)input.get(0);
>>>>>>>>>>              Long result = super.exec(input);
>>>>>>>>>>              LOG.warn(" before spill data bag memory : " +
>>>>>>>>>> Runtime.getRuntime().freeMemory());
>>>>>>>>>>              bag.spill();
>>>>>>>>>>              System.gc();
>>>>>>>>>>              LOG.warn(" after spill data bag memory : " +
>>>>>>>>>> Runtime.getRuntime().freeMemory());
>>>>>>>>>>              LOG.warn("big bag size: " + bag.size() + ",
>>>>>>>>>> hashcode: " +
>>>>>>>>>> bag.hashCode());
>>>>>>>>>>              return result;
>>>>>>>>>>      }
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> I think we have to redesign the data bag implementation with less
>>>>>>>>>> memory
>>>>>>>>>> consumed.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Haitao Yao
>>>>>>>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>>>>>>>> weibo: @haitao_yao
>>>>>>>>>> Skype:  haitao.yao.final
>>>>>>>>>> 
>>>>>>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>>>>>>>>>> 
>>>>>>>>>>> the pig script:
>>>>>>>>>>> 
>>>>>>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>>>>>>>>>> 
>>>>>>>>>>> grpall = group longDesc all;
>>>>>>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>>>>>>>>>> explain cnt;
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> the dump relation result:
>>>>>>>>>>> 
>>>>>>>>>>> #-----------------------------------------------
>>>>>>>>>>> # New Logical Plan:
>>>>>>>>>>> #-----------------------------------------------
>>>>>>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>>>>>>>>>> |
>>>>>>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>>>>>>>>> |   |
>>>>>>>>>>> |   (Name: LOGenerate[false] Schema:
>>>>>>>>>>> 
>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>>>>>>>>> |   |   |
>>>>>>>>>>> |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long
>>>>>>>>>>> Uid:
>>>>>>>>>>> 65)
>>>>>>>>>>> |   |   |
>>>>>>>>>>> |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0
>>>>>>>>>>> Column:
>>>>>>>>>>> (*))
>>>>>>>>>>> |   |
>>>>>>>>>>> |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>>>>>>>>>> 
>>>>>>>>>> 
>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>>>>>>>>> |
>>>>>>>>>>> |---grpall: (Name: LOCogroup Schema:
>>>>>>>>>>> 
>>>>>>>>>> 
>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>>>>>>>>>     |   |
>>>>>>>>>>>     |   (Name: Constant Type: chararray Uid: 62)
>>>>>>>>>>>     |
>>>>>>>>>>>     |---longDesc: (Name: LOLoad Schema:
>>>>>>>>>>> 
>>>>>>>>>> 
>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>>>>>>>>>> 
>>>>>>>>>>> #-----------------------------------------------
>>>>>>>>>>> # Physical Plan:
>>>>>>>>>>> #-----------------------------------------------
>>>>>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>>>>>>> |
>>>>>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>>>>>> |   |
>>>>>>>>>>> |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>>>>>>>>> |   |
>>>>>>>>>>> |   |---Project[bag][1] - scope-5
>>>>>>>>>>> |
>>>>>>>>>>> |---grpall: Package[tuple]{chararray} - scope-2
>>>>>>>>>>>     |
>>>>>>>>>>>     |---grpall: Global Rearrange[tuple] - scope-1
>>>>>>>>>>>         |
>>>>>>>>>>>         |---grpall: Local Rearrange[tuple]{chararray}(false) -
>>>>>>>>>>> scope-3
>>>>>>>>>>>             |   |
>>>>>>>>>>>             |   Constant(all) - scope-4
>>>>>>>>>>>             |
>>>>>>>>>>>             |---longDesc:
>>>>>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>>>>>>>>>> 
>>>>>>>>>>> 2012-07-09 15:47:02,441 [main] INFO
>>>>>>>>>>> 
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler
>>>>>>>>>>> -
>>>>>>>>>>> File concatenation threshold: 100 optimistic? false
>>>>>>>>>>> 2012-07-09 15:47:02,448 [main] INFO
>>>>>>>>>>> 
>>>>>>>>>> 
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>>>>>>>>>> - Choosing to move algebraic foreach to combiner
>>>>>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>>>>>> 
>>>>>>>>>> 
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>>>>>>> - MR plan size before optimization: 1
>>>>>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>>>>>> 
>>>>>>>>>> 
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>>>>>>> - MR plan size after optimization: 1
>>>>>>>>>>> #--------------------------------------------------
>>>>>>>>>>> # Map Reduce Plan
>>>>>>>>>>> #--------------------------------------------------
>>>>>>>>>>> MapReduce node scope-10
>>>>>>>>>>> Map Plan
>>>>>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>>>>>>>>>> |   |
>>>>>>>>>>> |   Project[chararray][0] - scope-23
>>>>>>>>>>> |
>>>>>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>>>>>>>>> |   |
>>>>>>>>>>> |   Project[chararray][0] - scope-12
>>>>>>>>>>> |   |
>>>>>>>>>>> |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] -
>>>>>>>>>>> scope-13
>>>>>>>>>>> |   |
>>>>>>>>>>> |   |---Project[bag][1] - scope-14
>>>>>>>>>>> |
>>>>>>>>>>> |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>>>>>>>>>     |
>>>>>>>>>>>     |---longDesc:
>>>>>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) -
>>>>>>>>>>> scope-0--------
>>>>>>>>>>> Combine Plan
>>>>>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>>>>>>>>>> |   |
>>>>>>>>>>> |   Project[chararray][0] - scope-27
>>>>>>>>>>> |
>>>>>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>>>>>>>>> |   |
>>>>>>>>>>> |   Project[chararray][0] - scope-16
>>>>>>>>>>> |   |
>>>>>>>>>>> |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple]
>>> -
>>>>>>>>>>> scope-17
>>>>>>>>>>> |   |
>>>>>>>>>>> |   |---Project[bag][1] - scope-18
>>>>>>>>>>> |
>>>>>>>>>>> |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>>>>>>>>>> Reduce Plan
>>>>>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>>>>>>> |
>>>>>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>>>>>> |   |
>>>>>>>>>>> |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] -
>>> scope-6
>>>>>>>>>>> |   |
>>>>>>>>>>> |   |---Project[bag][1] - scope-19
>>>>>>>>>>> |
>>>>>>>>>>> |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>>>>>>>>>> Global sort: false
>>>>>>>>>>> ----------------
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney
>>>>>>>>>>> <jcoveney@gmail.com <ma...@gmail.com>>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> instead of doing "dump relation," do "explain relation" (then run
>>>>>>>>>>>> identically) and paste the output here. It will show whether the
>>>>>>>>>> combiner
>>>>>>>>>>>> is being used,
>>>>>>>>>>>> 
>>>>>>>>>>>> 2012/7/3 Ruslan Al-Fakikh <ruslan.al-fakikh@jalent.ru
>>>>>>>>>>>> <ma...@jalent.ru>>
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> As it was said, COUNT is algebraic and should be fast, because
>>> it
>>>>>>>>>>>>> forces combiner. You should make sure that combiner is really
>>> used
>>>>>>>>>>>>> here. It can be disabled in some situations. I've encountered
>>> such
>>>>>>>>>>>>> situations many times when a job is tooo heavy in case no
>>>>>>>>>>>>> combiner is
>>>>>>>>>>>>> applied.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Ruslan
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S
>>>>>>>>>>>>> <subir.sasikumar@gmail.com <ma...@gmail.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Right!!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it
>>> must be
>>>>>>>>>>>>>> 'group all'. How can that be confirmed?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 7/3/12, Jonathan Coveney <jcoveney@gmail.com
>>>>>>>>>>>>>> <ma...@gmail.com>> wrote:
>>>>>>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and
>>>>>>>>>>>>>>> as such,
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> use combiners, so it is generally quite fast.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 2012/7/2 Subir S <subir.sasikumar@gmail.com
>>>>>>>>>>>>>>> <ma...@gmail.com>>
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count
>>> per
>>>>>>>>>> group
>>>>>>>>>>>>>>>> and sum may be.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 7/3/12, Sheng Guo <enigmaguo@gmail.com
>>>>>>>>>>>>>>>> <ma...@gmail.com>> wrote:
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I used to use the following pig script to do the counting
>>> of the
>>>>>>>>>>>>>>>>> records.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>>>>>>>>>> dump cnt_filter;
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of
>>>>>>>>>>>>>>>>> time
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> hang
>>>>>>>>>>>>>>>>> up, and or die.
>>>>>>>>>>>>>>>>> I thought counting should be simple enough, so what is the
>>>>>>>>>>>>>>>>> best way
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> do a
>>>>>>>>>>>>>>>>> counting in pig?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Sheng
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards,
>>>>>>>>>>>>> Ruslan Al-Fakikh
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
> 


Re: What is the best way to do counting in pig?

Posted by Jonathan Coveney <jc...@gmail.com>.
the listserv strips attachments. you'll have to host it somewhere else and
link it

2012/7/11 Haitao Yao <ya...@gmail.com>

> Sorry , I sent the mail only to Thejas.
>
> Resend it for all.
>
>
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-12,上午10:41, Haitao Yao 写道:
>
> >
> >
> > > Is your query using combiner ?
> >       I did know how to explicitly use combiner.
> >
> > > Can you send the explain plan output ?
> >       The explain result is in the attachment. It's a little long.
> >
> > <aa.explain>
> >
> > > Does the heap information say how many entries are there in the
> > InteralCachedBag's ArrayList ?
> >       There's 6 big Array lists, and the size is about 372692
> >       Here's the screen snapshot of the heap dump:
> >
> >       screen snapshot 1: you can see there's 6 big POForeEach instances
> >
> > <aa.jpg>
> >
> >               screen snapshot 2: you can see the memory are mostly
> retained by the big array list.
> >
> > <bb.jpg>
> >
> >               screen snapshot 3: you can see the big array list is
> referenced by InternalCachedBag:
> >
> > <cc.jpg>
> >
> > > What version of pig are you using?
> >       pig-0.9.2, I've read the latest source code of pig from github,
> and I don't find any improvements on IntercalCachedBag.
> >
> >
> > Haitao Yao
> > yao.erix@gmail.com
> > weibo: @haitao_yao
> > Skype:  haitao.yao.final
> >
> > 在 2012-7-12,上午8:56, Thejas Nair 写道:
> >
> >> Haitao,
> >> Is your query using combiner ? Can you send the explain plan output ?
> >> Does the heap information say how many entries are there in the
> >> InteralCachedBag's ArrayList ?
> >> What version of pig are you using ?
> >>
> >>
> >> Thanks,
> >> Thejas
> >>
> >>
> >> On 7/10/12 11:50 PM, Haitao Yao wrote:
> >>> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because
> >>> every time the InternalCachedBag spills, It creates a new tmp file in
> >>> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple
> >>> added into InternalCachedBag will create a new tmp file. And the tmp
> >>> file is deleted on exit.
> >>> So , if you're unlucky like me, you will get a OOM Exception caused by
> >>> java.io.DeleteOnExitHook!
> >>> Here's the evidence:
> >>>
> >>> God, we really need a full description of how every parameter works.
> >>>
> >>>
> >>>
> >>> Haitao Yao
> >>> yao.erix@gmail.com <ma...@gmail.com>
> >>> weibo: @haitao_yao
> >>> Skype:  haitao.yao.final
> >>>
> >>> 在 2012-7-10,下午4:20, Haitao Yao 写道:
> >>>
> >>>> I found the solution.
> >>>>
> >>>> After analyzing the heap dump while the reducer OOM, I found out the
> >>>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's
> >>>> the diagram:
> >>>> <cc.jpg>
> >>>>
> >>>> In the source code of org.apache.pig.data.InternalCachedBag, I found
> >>>> out there's a parameter for the cache limit:
> >>>> *public* InternalCachedBag(*int* bagCount) {
> >>>> *float* percent = 0.2F;
> >>>>
> >>>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
> >>>> // here, the cache limit is from here!
> >>>> String usage =
> >>>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
> >>>> *if* (usage != *null*) {
> >>>> percent = Float./parseFloat/(usage);
> >>>> }
> >>>> }
> >>>>
> >>>>        init(bagCount, percent);
> >>>>    }
> >>>> *private* *void* init(*int* bagCount, *float* percent) {
> >>>> factory = TupleFactory./getInstance/();
> >>>> mContents = *new* ArrayList<Tuple>();
> >>>>
> >>>> *long* max = Runtime./getRuntime/().maxMemory();
> >>>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
> >>>> cacheLimit = Integer./MAX_VALUE/;
> >>>>
> >>>> // set limit to 0, if memusage is 0 or really really small.
> >>>> // then all tuples are put into disk
> >>>> *if* (maxMemUsage < 1) {
> >>>> cacheLimit = 0;
> >>>>        }
> >>>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
> >>>> addDone = *false*;
> >>>>    }
> >>>>
> >>>> so, after write pig.cachedbag.memusage=0 into
> >>>> $PIG_HOME/conf/pig.properties, my job successes!
> >>>>
> >>>> You can also set to an appropriate value to fully utilize your memory
> >>>> as a cache.
> >>>>
> >>>> Hope this is useful for others.
> >>>> Thanks.
> >>>>
> >>>>
> >>>> Haitao Yao
> >>>> yao.erix@gmail.com <ma...@gmail.com>
> >>>> weibo: @haitao_yao
> >>>> Skype:  haitao.yao.final
> >>>>
> >>>> 在 2012-7-10,下午1:06, Haitao Yao 写道:
> >>>>
> >>>>> my reducers get 512 MB, -Xms512M -Xmx512M.
> >>>>> The reducer does not get OOM when manually invoke spill in my case.
> >>>>>
> >>>>> Can you explain more about your solution?
> >>>>> And can your solution fit into 512MB reducer process?
> >>>>> Thanks very much.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Haitao Yao
> >>>>> yao.erix@gmail.com <ma...@gmail.com>
> >>>>> weibo: @haitao_yao
> >>>>> Skype:  haitao.yao.final
> >>>>>
> >>>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
> >>>>>
> >>>>>> I have something in the mix that should reduce bag memory :)
> >>>>>> Question: how
> >>>>>> much memory are your reducers getting? In my experience, you'll get
> >>>>>> OOM's
> >>>>>> on spilling if you have allocated less than a gig to the JVM
> >>>>>>
> >>>>>> 2012/7/9 Haitao Yao <yao.erix@gmail.com <mailto:yao.erix@gmail.com
> >>
> >>>>>>
> >>>>>>> I have encountered the similar problem.  And I got a OOM while
> >>>>>>> running the
> >>>>>>> reducer.
> >>>>>>> I think the reason is the data bag generated after group all is too
> >>>>>>> big to
> >>>>>>> fit into the reducer's memory.
> >>>>>>>
> >>>>>>> and I have written a new COUNT implementation with explicit invoke
> >>>>>>> System.gc() and spill  after the COUNT function finish its job,
> but it
> >>>>>>> still get OOM
> >>>>>>>
> >>>>>>> here's the code of the new COUNT implementation:
> >>>>>>>       @Override
> >>>>>>>       public Long exec(Tuple input) throws IOException {
> >>>>>>>               DataBag bag = (DataBag)input.get(0);
> >>>>>>>               Long result = super.exec(input);
> >>>>>>>               LOG.warn(" before spill data bag memory : " +
> >>>>>>> Runtime.getRuntime().freeMemory());
> >>>>>>>               bag.spill();
> >>>>>>>               System.gc();
> >>>>>>>               LOG.warn(" after spill data bag memory : " +
> >>>>>>> Runtime.getRuntime().freeMemory());
> >>>>>>>               LOG.warn("big bag size: " + bag.size() + ",
> >>>>>>> hashcode: " +
> >>>>>>> bag.hashCode());
> >>>>>>>               return result;
> >>>>>>>       }
> >>>>>>>
> >>>>>>>
> >>>>>>> I think we have to redesign the data bag implementation with less
> >>>>>>> memory
> >>>>>>> consumed.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Haitao Yao
> >>>>>>> yao.erix@gmail.com <ma...@gmail.com>
> >>>>>>> weibo: @haitao_yao
> >>>>>>> Skype:  haitao.yao.final
> >>>>>>>
> >>>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
> >>>>>>>
> >>>>>>>> the pig script:
> >>>>>>>>
> >>>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
> >>>>>>>>
> >>>>>>>> grpall = group longDesc all;
> >>>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
> >>>>>>>> explain cnt;
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> the dump relation result:
> >>>>>>>>
> >>>>>>>> #-----------------------------------------------
> >>>>>>>> # New Logical Plan:
> >>>>>>>> #-----------------------------------------------
> >>>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
> >>>>>>>> |
> >>>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
> >>>>>>>>  |   |
> >>>>>>>>  |   (Name: LOGenerate[false] Schema:
> >>>>>>>>
> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
> >>>>>>>>  |   |   |
> >>>>>>>>  |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long
> >>>>>>>> Uid:
> >>>>>>>> 65)
> >>>>>>>>  |   |   |
> >>>>>>>>  |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0
> >>>>>>>> Column:
> >>>>>>>> (*))
> >>>>>>>>  |   |
> >>>>>>>>  |   |---longDesc: (Name: LOInnerLoad[1] Schema:
> >>>>>>>>
> >>>>>>>
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
> >>>>>>>>  |
> >>>>>>>>  |---grpall: (Name: LOCogroup Schema:
> >>>>>>>>
> >>>>>>>
> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
> >>>>>>>>      |   |
> >>>>>>>>      |   (Name: Constant Type: chararray Uid: 62)
> >>>>>>>>      |
> >>>>>>>>      |---longDesc: (Name: LOLoad Schema:
> >>>>>>>>
> >>>>>>>
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
> >>>>>>>>
> >>>>>>>> #-----------------------------------------------
> >>>>>>>> # Physical Plan:
> >>>>>>>> #-----------------------------------------------
> >>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
> >>>>>>>> |
> >>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
> >>>>>>>>  |   |
> >>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
> >>>>>>>>  |   |
> >>>>>>>>  |   |---Project[bag][1] - scope-5
> >>>>>>>>  |
> >>>>>>>>  |---grpall: Package[tuple]{chararray} - scope-2
> >>>>>>>>      |
> >>>>>>>>      |---grpall: Global Rearrange[tuple] - scope-1
> >>>>>>>>          |
> >>>>>>>>          |---grpall: Local Rearrange[tuple]{chararray}(false) -
> >>>>>>>> scope-3
> >>>>>>>>              |   |
> >>>>>>>>              |   Constant(all) - scope-4
> >>>>>>>>              |
> >>>>>>>>              |---longDesc:
> >>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
> >>>>>>>>
> >>>>>>>> 2012-07-09 15:47:02,441 [main] INFO
> >>>>>>>>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler
> >>>>>>>> -
> >>>>>>>> File concatenation threshold: 100 optimistic? false
> >>>>>>>> 2012-07-09 15:47:02,448 [main] INFO
> >>>>>>>>
> >>>>>>>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
> >>>>>>>> - Choosing to move algebraic foreach to combiner
> >>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
> >>>>>>>>
> >>>>>>>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> >>>>>>>> - MR plan size before optimization: 1
> >>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
> >>>>>>>>
> >>>>>>>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> >>>>>>>> - MR plan size after optimization: 1
> >>>>>>>> #--------------------------------------------------
> >>>>>>>> # Map Reduce Plan
> >>>>>>>> #--------------------------------------------------
> >>>>>>>> MapReduce node scope-10
> >>>>>>>> Map Plan
> >>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
> >>>>>>>> |   |
> >>>>>>>> |   Project[chararray][0] - scope-23
> >>>>>>>> |
> >>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11
> >>>>>>>>  |   |
> >>>>>>>>  |   Project[chararray][0] - scope-12
> >>>>>>>>  |   |
> >>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] -
> >>>>>>>> scope-13
> >>>>>>>>  |   |
> >>>>>>>>  |   |---Project[bag][1] - scope-14
> >>>>>>>>  |
> >>>>>>>>  |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
> >>>>>>>>      |
> >>>>>>>>      |---longDesc:
> >>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) -
> >>>>>>>> scope-0--------
> >>>>>>>> Combine Plan
> >>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
> >>>>>>>> |   |
> >>>>>>>> |   Project[chararray][0] - scope-27
> >>>>>>>> |
> >>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15
> >>>>>>>>  |   |
> >>>>>>>>  |   Project[chararray][0] - scope-16
> >>>>>>>>  |   |
> >>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple]
> -
> >>>>>>>> scope-17
> >>>>>>>>  |   |
> >>>>>>>>  |   |---Project[bag][1] - scope-18
> >>>>>>>>  |
> >>>>>>>>  |---POCombinerPackage[tuple]{chararray} - scope-20--------
> >>>>>>>> Reduce Plan
> >>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
> >>>>>>>> |
> >>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
> >>>>>>>>  |   |
> >>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] -
> scope-6
> >>>>>>>>  |   |
> >>>>>>>>  |   |---Project[bag][1] - scope-19
> >>>>>>>>  |
> >>>>>>>>  |---POCombinerPackage[tuple]{chararray} - scope-28--------
> >>>>>>>> Global sort: false
> >>>>>>>> ----------------
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney
> >>>>>>>> <jcoveney@gmail.com <ma...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> instead of doing "dump relation," do "explain relation" (then run
> >>>>>>>>> identically) and paste the output here. It will show whether the
> >>>>>>> combiner
> >>>>>>>>> is being used,
> >>>>>>>>>
> >>>>>>>>> 2012/7/3 Ruslan Al-Fakikh <ruslan.al-fakikh@jalent.ru
> >>>>>>>>> <ma...@jalent.ru>>
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> As it was said, COUNT is algebraic and should be fast, because
> it
> >>>>>>>>>> forces combiner. You should make sure that combiner is really
> used
> >>>>>>>>>> here. It can be disabled in some situations. I've encountered
> such
> >>>>>>>>>> situations many times when a job is tooo heavy in case no
> >>>>>>>>>> combiner is
> >>>>>>>>>> applied.
> >>>>>>>>>>
> >>>>>>>>>> Ruslan
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S
> >>>>>>>>>> <subir.sasikumar@gmail.com <ma...@gmail.com>>
> >>>>>>>>> wrote:
> >>>>>>>>>>> Right!!
> >>>>>>>>>>>
> >>>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it
> must be
> >>>>>>>>>>> 'group all'. How can that be confirmed?
> >>>>>>>>>>>
> >>>>>>>>>>> On 7/3/12, Jonathan Coveney <jcoveney@gmail.com
> >>>>>>>>>>> <ma...@gmail.com>> wrote:
> >>>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and
> >>>>>>>>>>>> as such,
> >>>>>>>>>> will
> >>>>>>>>>>>> use combiners, so it is generally quite fast.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2012/7/2 Subir S <subir.sasikumar@gmail.com
> >>>>>>>>>>>> <ma...@gmail.com>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count
> per
> >>>>>>> group
> >>>>>>>>>>>>> and sum may be.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 7/3/12, Sheng Guo <enigmaguo@gmail.com
> >>>>>>>>>>>>> <ma...@gmail.com>> wrote:
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I used to use the following pig script to do the counting
> of the
> >>>>>>>>>>>>>> records.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
> >>>>>>>>>>>>>> grpd = group m_skill_group all;
> >>>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> cnt_filter = limit cnt 10;
> >>>>>>>>>>>>>> dump cnt_filter;
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of
> >>>>>>>>>>>>>> time
> >>>>>>>>> and
> >>>>>>>>>>>>> hang
> >>>>>>>>>>>>>> up, and or die.
> >>>>>>>>>>>>>> I thought counting should be simple enough, so what is the
> >>>>>>>>>>>>>> best way
> >>>>>>>>>> to
> >>>>>>>>>>>>> do a
> >>>>>>>>>>>>>> counting in pig?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Sheng
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Best Regards,
> >>>>>>>>>> Ruslan Al-Fakikh
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: What is the best way to do counting in pig?

Posted by Haitao Yao <ya...@gmail.com>.
Sorry , I sent the mail only to Thejas. 

Resend it for all.


Haitao Yao
yao.erix@gmail.com
weibo: @haitao_yao
Skype:  haitao.yao.final

在 2012-7-12,上午10:41, Haitao Yao 写道:

> 
> 
> > Is your query using combiner ?
> 	I did know how to explicitly use combiner.
> 
> > Can you send the explain plan output ?
> 	The explain result is in the attachment. It's a little long. 
> 	
> <aa.explain>
> 
> > Does the heap information say how many entries are there in the
> InteralCachedBag's ArrayList ?
> 	There's 6 big Array lists, and the size is about 372692
> 	Here's the screen snapshot of the heap dump:
> 
> 	screen snapshot 1: you can see there's 6 big POForeEach instances
> 	
> <aa.jpg>
> 
> 		screen snapshot 2: you can see the memory are mostly retained by the big array list.
> 		
> <bb.jpg>
> 
> 		screen snapshot 3: you can see the big array list is referenced by InternalCachedBag: 
> 			
> <cc.jpg>
> 	
> > What version of pig are you using?
> 	pig-0.9.2, I've read the latest source code of pig from github, and I don't find any improvements on IntercalCachedBag.
> 
> 
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
> 
> 在 2012-7-12,上午8:56, Thejas Nair 写道:
> 
>> Haitao,
>> Is your query using combiner ? Can you send the explain plan output ?
>> Does the heap information say how many entries are there in the
>> InteralCachedBag's ArrayList ?
>> What version of pig are you using ?
>> 
>> 
>> Thanks,
>> Thejas
>> 
>> 
>> On 7/10/12 11:50 PM, Haitao Yao wrote:
>>> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because 
>>> every time the InternalCachedBag spills, It creates a new tmp file in 
>>> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple 
>>> added into InternalCachedBag will create a new tmp file. And the tmp 
>>> file is deleted on exit.
>>> So , if you're unlucky like me, you will get a OOM Exception caused by 
>>> java.io.DeleteOnExitHook!
>>> Here's the evidence:
>>> 
>>> God, we really need a full description of how every parameter works.
>>> 
>>> 
>>> 
>>> Haitao Yao
>>> yao.erix@gmail.com <ma...@gmail.com>
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>> 
>>> 在 2012-7-10,下午4:20, Haitao Yao 写道:
>>> 
>>>> I found the solution.
>>>> 
>>>> After analyzing the heap dump while the reducer OOM, I found out the 
>>>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's 
>>>> the diagram:
>>>> <cc.jpg>
>>>> 
>>>> In the source code of org.apache.pig.data.InternalCachedBag, I found 
>>>> out there's a parameter for the cache limit:
>>>> *public* InternalCachedBag(*int* bagCount) {
>>>> *float* percent = 0.2F;
>>>> 
>>>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
>>>> // here, the cache limit is from here!
>>>> String usage = 
>>>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
>>>> *if* (usage != *null*) {
>>>> percent = Float./parseFloat/(usage);
>>>> }
>>>> }
>>>> 
>>>>        init(bagCount, percent);
>>>>    }
>>>> *private* *void* init(*int* bagCount, *float* percent) {
>>>> factory = TupleFactory./getInstance/();
>>>> mContents = *new* ArrayList<Tuple>();
>>>> 
>>>> *long* max = Runtime./getRuntime/().maxMemory();
>>>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
>>>> cacheLimit = Integer./MAX_VALUE/;
>>>> 
>>>> // set limit to 0, if memusage is 0 or really really small.
>>>> // then all tuples are put into disk
>>>> *if* (maxMemUsage < 1) {
>>>> cacheLimit = 0;
>>>>        }
>>>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
>>>> addDone = *false*;
>>>>    }
>>>> 
>>>> so, after write pig.cachedbag.memusage=0 into 
>>>> $PIG_HOME/conf/pig.properties, my job successes!
>>>> 
>>>> You can also set to an appropriate value to fully utilize your memory 
>>>> as a cache.
>>>> 
>>>> Hope this is useful for others.
>>>> Thanks.
>>>> 
>>>> 
>>>> Haitao Yao
>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>> weibo: @haitao_yao
>>>> Skype:  haitao.yao.final
>>>> 
>>>> 在 2012-7-10,下午1:06, Haitao Yao 写道:
>>>> 
>>>>> my reducers get 512 MB, -Xms512M -Xmx512M.
>>>>> The reducer does not get OOM when manually invoke spill in my case.
>>>>> 
>>>>> Can you explain more about your solution?
>>>>> And can your solution fit into 512MB reducer process?
>>>>> Thanks very much.
>>>>> 
>>>>> 
>>>>> 
>>>>> Haitao Yao
>>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>>> weibo: @haitao_yao
>>>>> Skype:  haitao.yao.final
>>>>> 
>>>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>>>>> 
>>>>>> I have something in the mix that should reduce bag memory :) 
>>>>>> Question: how
>>>>>> much memory are your reducers getting? In my experience, you'll get 
>>>>>> OOM's
>>>>>> on spilling if you have allocated less than a gig to the JVM
>>>>>> 
>>>>>> 2012/7/9 Haitao Yao <yao.erix@gmail.com <ma...@gmail.com>>
>>>>>> 
>>>>>>> I have encountered the similar problem.  And I got a OOM while 
>>>>>>> running the
>>>>>>> reducer.
>>>>>>> I think the reason is the data bag generated after group all is too 
>>>>>>> big to
>>>>>>> fit into the reducer's memory.
>>>>>>> 
>>>>>>> and I have written a new COUNT implementation with explicit invoke
>>>>>>> System.gc() and spill  after the COUNT function finish its job, but it
>>>>>>> still get OOM
>>>>>>> 
>>>>>>> here's the code of the new COUNT implementation:
>>>>>>>       @Override
>>>>>>>       public Long exec(Tuple input) throws IOException {
>>>>>>>               DataBag bag = (DataBag)input.get(0);
>>>>>>>               Long result = super.exec(input);
>>>>>>>               LOG.warn(" before spill data bag memory : " +
>>>>>>> Runtime.getRuntime().freeMemory());
>>>>>>>               bag.spill();
>>>>>>>               System.gc();
>>>>>>>               LOG.warn(" after spill data bag memory : " +
>>>>>>> Runtime.getRuntime().freeMemory());
>>>>>>>               LOG.warn("big bag size: " + bag.size() + ", 
>>>>>>> hashcode: " +
>>>>>>> bag.hashCode());
>>>>>>>               return result;
>>>>>>>       }
>>>>>>> 
>>>>>>> 
>>>>>>> I think we have to redesign the data bag implementation with less 
>>>>>>> memory
>>>>>>> consumed.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Haitao Yao
>>>>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>>>>> weibo: @haitao_yao
>>>>>>> Skype:  haitao.yao.final
>>>>>>> 
>>>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>>>>>>> 
>>>>>>>> the pig script:
>>>>>>>> 
>>>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>>>>>>> 
>>>>>>>> grpall = group longDesc all;
>>>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>>>>>>> explain cnt;
>>>>>>>> 
>>>>>>>> 
>>>>>>>> the dump relation result:
>>>>>>>> 
>>>>>>>> #-----------------------------------------------
>>>>>>>> # New Logical Plan:
>>>>>>>> #-----------------------------------------------
>>>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>>>>>>> |
>>>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>>>>>>  |   |
>>>>>>>>  |   (Name: LOGenerate[false] Schema:
>>>>>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>>>>>>  |   |   |
>>>>>>>>  |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long 
>>>>>>>> Uid:
>>>>>>>> 65)
>>>>>>>>  |   |   |
>>>>>>>>  |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 
>>>>>>>> Column:
>>>>>>>> (*))
>>>>>>>>  |   |
>>>>>>>>  |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>>>>>>> 
>>>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>>>>>>  |
>>>>>>>>  |---grpall: (Name: LOCogroup Schema:
>>>>>>>> 
>>>>>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>>>>>>      |   |
>>>>>>>>      |   (Name: Constant Type: chararray Uid: 62)
>>>>>>>>      |
>>>>>>>>      |---longDesc: (Name: LOLoad Schema:
>>>>>>>> 
>>>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>>>>>>> 
>>>>>>>> #-----------------------------------------------
>>>>>>>> # Physical Plan:
>>>>>>>> #-----------------------------------------------
>>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-5
>>>>>>>>  |
>>>>>>>>  |---grpall: Package[tuple]{chararray} - scope-2
>>>>>>>>      |
>>>>>>>>      |---grpall: Global Rearrange[tuple] - scope-1
>>>>>>>>          |
>>>>>>>>          |---grpall: Local Rearrange[tuple]{chararray}(false) - 
>>>>>>>> scope-3
>>>>>>>>              |   |
>>>>>>>>              |   Constant(all) - scope-4
>>>>>>>>              |
>>>>>>>>              |---longDesc:
>>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>>>>>>> 
>>>>>>>> 2012-07-09 15:47:02,441 [main] INFO
>>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler 
>>>>>>>> -
>>>>>>>> File concatenation threshold: 100 optimistic? false
>>>>>>>> 2012-07-09 15:47:02,448 [main] INFO
>>>>>>>> 
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>>>>>>> - Choosing to move algebraic foreach to combiner
>>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>>> 
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>>>> - MR plan size before optimization: 1
>>>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>>> 
>>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>>>> - MR plan size after optimization: 1
>>>>>>>> #--------------------------------------------------
>>>>>>>> # Map Reduce Plan
>>>>>>>> #--------------------------------------------------
>>>>>>>> MapReduce node scope-10
>>>>>>>> Map Plan
>>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>>>>>>> |   |
>>>>>>>> |   Project[chararray][0] - scope-23
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>>>>>>  |   |
>>>>>>>>  |   Project[chararray][0] - scope-12
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - 
>>>>>>>> scope-13
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-14
>>>>>>>>  |
>>>>>>>>  |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>>>>>>      |
>>>>>>>>      |---longDesc:
>>>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - 
>>>>>>>> scope-0--------
>>>>>>>> Combine Plan
>>>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>>>>>>> |   |
>>>>>>>> |   Project[chararray][0] - scope-27
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>>>>>>  |   |
>>>>>>>>  |   Project[chararray][0] - scope-16
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>>>>>>>> scope-17
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-18
>>>>>>>>  |
>>>>>>>>  |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>>>>>>> Reduce Plan
>>>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>>>> |
>>>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>>>  |   |
>>>>>>>>  |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>>>>>>>>  |   |
>>>>>>>>  |   |---Project[bag][1] - scope-19
>>>>>>>>  |
>>>>>>>>  |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>>>>>>> Global sort: false
>>>>>>>> ----------------
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney 
>>>>>>>> <jcoveney@gmail.com <ma...@gmail.com>>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> instead of doing "dump relation," do "explain relation" (then run
>>>>>>>>> identically) and paste the output here. It will show whether the
>>>>>>> combiner
>>>>>>>>> is being used,
>>>>>>>>> 
>>>>>>>>> 2012/7/3 Ruslan Al-Fakikh <ruslan.al-fakikh@jalent.ru 
>>>>>>>>> <ma...@jalent.ru>>
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> As it was said, COUNT is algebraic and should be fast, because it
>>>>>>>>>> forces combiner. You should make sure that combiner is really used
>>>>>>>>>> here. It can be disabled in some situations. I've encountered such
>>>>>>>>>> situations many times when a job is tooo heavy in case no 
>>>>>>>>>> combiner is
>>>>>>>>>> applied.
>>>>>>>>>> 
>>>>>>>>>> Ruslan
>>>>>>>>>> 
>>>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S 
>>>>>>>>>> <subir.sasikumar@gmail.com <ma...@gmail.com>>
>>>>>>>>> wrote:
>>>>>>>>>>> Right!!
>>>>>>>>>>> 
>>>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>>>>>>>>> 'group all'. How can that be confirmed?
>>>>>>>>>>> 
>>>>>>>>>>> On 7/3/12, Jonathan Coveney <jcoveney@gmail.com 
>>>>>>>>>>> <ma...@gmail.com>> wrote:
>>>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and 
>>>>>>>>>>>> as such,
>>>>>>>>>> will
>>>>>>>>>>>> use combiners, so it is generally quite fast.
>>>>>>>>>>>> 
>>>>>>>>>>>> 2012/7/2 Subir S <subir.sasikumar@gmail.com 
>>>>>>>>>>>> <ma...@gmail.com>>
>>>>>>>>>>>> 
>>>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per
>>>>>>> group
>>>>>>>>>>>>> and sum may be.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 7/3/12, Sheng Guo <enigmaguo@gmail.com 
>>>>>>>>>>>>> <ma...@gmail.com>> wrote:
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>>>>>>>>> records.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>>>>>>> dump cnt_filter;
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of 
>>>>>>>>>>>>>> time
>>>>>>>>> and
>>>>>>>>>>>>> hang
>>>>>>>>>>>>>> up, and or die.
>>>>>>>>>>>>>> I thought counting should be simple enough, so what is the 
>>>>>>>>>>>>>> best way
>>>>>>>>>> to
>>>>>>>>>>>>> do a
>>>>>>>>>>>>>> counting in pig?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Sheng
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Best Regards,
>>>>>>>>>> Ruslan Al-Fakikh
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
> 


Re: What is the best way to do counting in pig?

Posted by Thejas Nair <th...@hortonworks.com>.
Haitao,
Is your query using combiner ? Can you send the explain plan output ?
Does the heap information say how many entries are there in the
InteralCachedBag's ArrayList ?
What version of pig are you using ?


Thanks,
Thejas


On 7/10/12 11:50 PM, Haitao Yao wrote:
> Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because 
> every time the InternalCachedBag spills, It creates a new tmp file in 
> java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple 
> added into InternalCachedBag will create a new tmp file. And the tmp 
> file is deleted on exit.
> So , if you're unlucky like me, you will get a OOM Exception caused by 
> java.io.DeleteOnExitHook!
> Here's the evidence:
> 
> God, we really need a full description of how every parameter works.
> 
> 
> 
> Haitao Yao
> yao.erix@gmail.com <ma...@gmail.com>
> weibo: @haitao_yao
> Skype:  haitao.yao.final
> 
> 在 2012-7-10,下午4:20, Haitao Yao 写道:
> 
>> I found the solution.
>>
>> After analyzing the heap dump while the reducer OOM, I found out the 
>> memory is consumed by org.apache.pig.data.InternalCachedBag , here's 
>> the diagram:
>> <cc.jpg>
>>
>> In the source code of org.apache.pig.data.InternalCachedBag, I found 
>> out there's a parameter for the cache limit:
>> *public* InternalCachedBag(*int* bagCount) {
>> *float* percent = 0.2F;
>>
>> *if* (PigMapReduce./sJobConfInternal/.get() != *null*) {
>> // here, the cache limit is from here!
>> String usage = 
>> PigMapReduce./sJobConfInternal/.get().get("pig.cachedbag.memusage");
>> *if* (usage != *null*) {
>> percent = Float./parseFloat/(usage);
>> }
>> }
>>
>>         init(bagCount, percent);
>>     }
>> *private* *void* init(*int* bagCount, *float* percent) {
>> factory = TupleFactory./getInstance/();
>> mContents = *new* ArrayList<Tuple>();
>>
>> *long* max = Runtime./getRuntime/().maxMemory();
>> maxMemUsage = (*long*)(((*float*)max * percent) / (*float*)bagCount);
>> cacheLimit = Integer./MAX_VALUE/;
>>
>> // set limit to 0, if memusage is 0 or really really small.
>> // then all tuples are put into disk
>> *if* (maxMemUsage < 1) {
>> cacheLimit = 0;
>>         }
>> /log/.warn("cacheLimit: " + *this*.cacheLimit);
>> addDone = *false*;
>>     }
>>
>> so, after write pig.cachedbag.memusage=0 into 
>> $PIG_HOME/conf/pig.properties, my job successes!
>>
>> You can also set to an appropriate value to fully utilize your memory 
>> as a cache.
>>
>> Hope this is useful for others.
>> Thanks.
>>
>>
>> Haitao Yao
>> yao.erix@gmail.com <ma...@gmail.com>
>> weibo: @haitao_yao
>> Skype:  haitao.yao.final
>>
>> 在 2012-7-10,下午1:06, Haitao Yao 写道:
>>
>>> my reducers get 512 MB, -Xms512M -Xmx512M.
>>> The reducer does not get OOM when manually invoke spill in my case.
>>>
>>> Can you explain more about your solution?
>>> And can your solution fit into 512MB reducer process?
>>> Thanks very much.
>>>
>>>
>>>
>>> Haitao Yao
>>> yao.erix@gmail.com <ma...@gmail.com>
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>>
>>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>>>
>>>> I have something in the mix that should reduce bag memory :) 
>>>> Question: how
>>>> much memory are your reducers getting? In my experience, you'll get 
>>>> OOM's
>>>> on spilling if you have allocated less than a gig to the JVM
>>>>
>>>> 2012/7/9 Haitao Yao <yao.erix@gmail.com <ma...@gmail.com>>
>>>>
>>>>> I have encountered the similar problem.  And I got a OOM while 
>>>>> running the
>>>>> reducer.
>>>>> I think the reason is the data bag generated after group all is too 
>>>>> big to
>>>>> fit into the reducer's memory.
>>>>>
>>>>> and I have written a new COUNT implementation with explicit invoke
>>>>> System.gc() and spill  after the COUNT function finish its job, but it
>>>>> still get OOM
>>>>>
>>>>> here's the code of the new COUNT implementation:
>>>>>        @Override
>>>>>        public Long exec(Tuple input) throws IOException {
>>>>>                DataBag bag = (DataBag)input.get(0);
>>>>>                Long result = super.exec(input);
>>>>>                LOG.warn(" before spill data bag memory : " +
>>>>> Runtime.getRuntime().freeMemory());
>>>>>                bag.spill();
>>>>>                System.gc();
>>>>>                LOG.warn(" after spill data bag memory : " +
>>>>> Runtime.getRuntime().freeMemory());
>>>>>                LOG.warn("big bag size: " + bag.size() + ", 
>>>>> hashcode: " +
>>>>> bag.hashCode());
>>>>>                return result;
>>>>>        }
>>>>>
>>>>>
>>>>> I think we have to redesign the data bag implementation with less 
>>>>> memory
>>>>> consumed.
>>>>>
>>>>>
>>>>>
>>>>> Haitao Yao
>>>>> yao.erix@gmail.com <ma...@gmail.com>
>>>>> weibo: @haitao_yao
>>>>> Skype:  haitao.yao.final
>>>>>
>>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>>>>>
>>>>>> the pig script:
>>>>>>
>>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>>>>>
>>>>>> grpall = group longDesc all;
>>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>>>>> explain cnt;
>>>>>>
>>>>>>
>>>>>> the dump relation result:
>>>>>>
>>>>>> #-----------------------------------------------
>>>>>> # New Logical Plan:
>>>>>> #-----------------------------------------------
>>>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>>>>> |
>>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>>>>   |   |
>>>>>>   |   (Name: LOGenerate[false] Schema:
>>>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>>>>   |   |   |
>>>>>>   |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long 
>>>>>> Uid:
>>>>>> 65)
>>>>>>   |   |   |
>>>>>>   |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 
>>>>>> Column:
>>>>>> (*))
>>>>>>   |   |
>>>>>>   |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>>>>>
>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>>>>   |
>>>>>>   |---grpall: (Name: LOCogroup Schema:
>>>>>>
>>>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>>>>       |   |
>>>>>>       |   (Name: Constant Type: chararray Uid: 62)
>>>>>>       |
>>>>>>       |---longDesc: (Name: LOLoad Schema:
>>>>>>
>>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>>>>>
>>>>>> #-----------------------------------------------
>>>>>> # Physical Plan:
>>>>>> #-----------------------------------------------
>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>> |
>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-5
>>>>>>   |
>>>>>>   |---grpall: Package[tuple]{chararray} - scope-2
>>>>>>       |
>>>>>>       |---grpall: Global Rearrange[tuple] - scope-1
>>>>>>           |
>>>>>>           |---grpall: Local Rearrange[tuple]{chararray}(false) - 
>>>>>> scope-3
>>>>>>               |   |
>>>>>>               |   Constant(all) - scope-4
>>>>>>               |
>>>>>>               |---longDesc:
>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>>>>>
>>>>>> 2012-07-09 15:47:02,441 [main] INFO
>>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler 
>>>>>> -
>>>>>> File concatenation threshold: 100 optimistic? false
>>>>>> 2012-07-09 15:47:02,448 [main] INFO
>>>>>>
>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>>>>> - Choosing to move algebraic foreach to combiner
>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>
>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>> - MR plan size before optimization: 1
>>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>>>
>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>>> - MR plan size after optimization: 1
>>>>>> #--------------------------------------------------
>>>>>> # Map Reduce Plan
>>>>>> #--------------------------------------------------
>>>>>> MapReduce node scope-10
>>>>>> Map Plan
>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>>>>> |   |
>>>>>> |   Project[chararray][0] - scope-23
>>>>>> |
>>>>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>>>>   |   |
>>>>>>   |   Project[chararray][0] - scope-12
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - 
>>>>>> scope-13
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-14
>>>>>>   |
>>>>>>   |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>>>>       |
>>>>>>       |---longDesc:
>>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - 
>>>>>> scope-0--------
>>>>>> Combine Plan
>>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>>>>> |   |
>>>>>> |   Project[chararray][0] - scope-27
>>>>>> |
>>>>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>>>>   |   |
>>>>>>   |   Project[chararray][0] - scope-16
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>>>>>> scope-17
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-18
>>>>>>   |
>>>>>>   |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>>>>> Reduce Plan
>>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>>> |
>>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>>   |   |
>>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>>>>>>   |   |
>>>>>>   |   |---Project[bag][1] - scope-19
>>>>>>   |
>>>>>>   |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>>>>> Global sort: false
>>>>>> ----------------
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney 
>>>>>> <jcoveney@gmail.com <ma...@gmail.com>>
>>>>> wrote:
>>>>>>
>>>>>>> instead of doing "dump relation," do "explain relation" (then run
>>>>>>> identically) and paste the output here. It will show whether the
>>>>> combiner
>>>>>>> is being used,
>>>>>>>
>>>>>>> 2012/7/3 Ruslan Al-Fakikh <ruslan.al-fakikh@jalent.ru 
>>>>>>> <ma...@jalent.ru>>
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> As it was said, COUNT is algebraic and should be fast, because it
>>>>>>>> forces combiner. You should make sure that combiner is really used
>>>>>>>> here. It can be disabled in some situations. I've encountered such
>>>>>>>> situations many times when a job is tooo heavy in case no 
>>>>>>>> combiner is
>>>>>>>> applied.
>>>>>>>>
>>>>>>>> Ruslan
>>>>>>>>
>>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S 
>>>>>>>> <subir.sasikumar@gmail.com <ma...@gmail.com>>
>>>>>>> wrote:
>>>>>>>>> Right!!
>>>>>>>>>
>>>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>>>>>>> 'group all'. How can that be confirmed?
>>>>>>>>>
>>>>>>>>> On 7/3/12, Jonathan Coveney <jcoveney@gmail.com 
>>>>>>>>> <ma...@gmail.com>> wrote:
>>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and 
>>>>>>>>>> as such,
>>>>>>>> will
>>>>>>>>>> use combiners, so it is generally quite fast.
>>>>>>>>>>
>>>>>>>>>> 2012/7/2 Subir S <subir.sasikumar@gmail.com 
>>>>>>>>>> <ma...@gmail.com>>
>>>>>>>>>>
>>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per
>>>>> group
>>>>>>>>>>> and sum may be.
>>>>>>>>>>>
>>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>>>>>
>>>>>>>>>>> On 7/3/12, Sheng Guo <enigmaguo@gmail.com 
>>>>>>>>>>> <ma...@gmail.com>> wrote:
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>>>>>>> records.
>>>>>>>>>>>>
>>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>>>>>
>>>>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>>>>> dump cnt_filter;
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of 
>>>>>>>>>>>> time
>>>>>>> and
>>>>>>>>>>> hang
>>>>>>>>>>>> up, and or die.
>>>>>>>>>>>> I thought counting should be simple enough, so what is the 
>>>>>>>>>>>> best way
>>>>>>>> to
>>>>>>>>>>> do a
>>>>>>>>>>>> counting in pig?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> Sheng
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards,
>>>>>>>> Ruslan Al-Fakikh
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>
> 



Re: What is the best way to do counting in pig?

Posted by Haitao Yao <ya...@gmail.com>.
Oh, new discovery: we can not set pig.cachedbag.memusage = 0 because every time the InternalCachedBag spills, It creates a new tmp file in java.io.tmpdir. if we set pig.cachedbag.memusage to 0 , every new tuple added into InternalCachedBag will create a new tmp file. And the tmp file is deleted on exit.
So , if you're unlucky like me, you will get a OOM Exception caused by java.io.DeleteOnExitHook! 
Here's the evidence:


God, we really need a full description of how every parameter works.



Haitao Yao
yao.erix@gmail.com
weibo: @haitao_yao
Skype:  haitao.yao.final

在 2012-7-10,下午4:20, Haitao Yao 写道:

> I found the solution.
> 
> After analyzing the heap dump while the reducer OOM, I found out the memory is consumed by org.apache.pig.data.InternalCachedBag , here's the diagram: 
> <cc.jpg>
> 
> In the source code of org.apache.pig.data.InternalCachedBag, I found out there's a parameter for the cache limit: 
>  public InternalCachedBag(int bagCount) {       
>         float percent = 0.2F;
>         
>     	if (PigMapReduce.sJobConfInternal.get() != null) {
> 		// here, the cache limit is from here! 
>     		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
>     		if (usage != null) {
>     			percent = Float.parseFloat(usage);
>     		}
>     	}
> 
>         init(bagCount, percent);
>     }  
>     private void init(int bagCount, float percent) {
>     	factory = TupleFactory.getInstance();        
>     	mContents = new ArrayList<Tuple>();             
>              	 
>     	long max = Runtime.getRuntime().maxMemory();
>         maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
>         cacheLimit = Integer.MAX_VALUE;
>         
>         // set limit to 0, if memusage is 0 or really really small.
>         // then all tuples are put into disk
>         if (maxMemUsage < 1) {
>         	cacheLimit = 0;
>         }
>         log.warn("cacheLimit: " + this.cacheLimit);
>         addDone = false;
>     }
> 
> so, after write pig.cachedbag.memusage=0 into $PIG_HOME/conf/pig.properties, my job successes!
> 
> You can also set to an appropriate value to fully utilize your memory as a cache.
> 
> Hope this is useful for others.
> Thanks.
> 
> 
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
> 
> 在 2012-7-10,下午1:06, Haitao Yao 写道:
> 
>> my reducers get 512 MB, -Xms512M -Xmx512M. 
>> The reducer does not get OOM when manually invoke spill in my case. 
>> 
>> Can you explain more about your solution? 
>> And can your solution fit into 512MB reducer process?
>> Thanks very much.
>> 
>> 
>> 
>> Haitao Yao
>> yao.erix@gmail.com
>> weibo: @haitao_yao
>> Skype:  haitao.yao.final
>> 
>> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
>> 
>>> I have something in the mix that should reduce bag memory :) Question: how
>>> much memory are your reducers getting? In my experience, you'll get OOM's
>>> on spilling if you have allocated less than a gig to the JVM
>>> 
>>> 2012/7/9 Haitao Yao <ya...@gmail.com>
>>> 
>>>> I have encountered the similar problem.  And I got a OOM while running the
>>>> reducer.
>>>> I think the reason is the data bag generated after group all is too big to
>>>> fit into the reducer's memory.
>>>> 
>>>> and I have written a new COUNT implementation with explicit invoke
>>>> System.gc() and spill  after the COUNT function finish its job, but it
>>>> still get OOM
>>>> 
>>>> here's the code of the new COUNT implementation:
>>>>        @Override
>>>>        public Long exec(Tuple input) throws IOException {
>>>>                DataBag bag = (DataBag)input.get(0);
>>>>                Long result = super.exec(input);
>>>>                LOG.warn(" before spill data bag memory : " +
>>>> Runtime.getRuntime().freeMemory());
>>>>                bag.spill();
>>>>                System.gc();
>>>>                LOG.warn(" after spill data bag memory : " +
>>>> Runtime.getRuntime().freeMemory());
>>>>                LOG.warn("big bag size: " + bag.size() + ", hashcode: " +
>>>> bag.hashCode());
>>>>                return result;
>>>>        }
>>>> 
>>>> 
>>>> I think we have to redesign the data bag implementation with less memory
>>>> consumed.
>>>> 
>>>> 
>>>> 
>>>> Haitao Yao
>>>> yao.erix@gmail.com
>>>> weibo: @haitao_yao
>>>> Skype:  haitao.yao.final
>>>> 
>>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>>>> 
>>>>> the pig script:
>>>>> 
>>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>>>> 
>>>>> grpall = group longDesc all;
>>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>>>> explain cnt;
>>>>> 
>>>>> 
>>>>> the dump relation result:
>>>>> 
>>>>> #-----------------------------------------------
>>>>> # New Logical Plan:
>>>>> #-----------------------------------------------
>>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>>>> |
>>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>>>   |   |
>>>>>   |   (Name: LOGenerate[false] Schema:
>>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>>>   |   |   |
>>>>>   |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid:
>>>>> 65)
>>>>>   |   |   |
>>>>>   |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 Column:
>>>>> (*))
>>>>>   |   |
>>>>>   |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>>>> 
>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>>>   |
>>>>>   |---grpall: (Name: LOCogroup Schema:
>>>>> 
>>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>>>       |   |
>>>>>       |   (Name: Constant Type: chararray Uid: 62)
>>>>>       |
>>>>>       |---longDesc: (Name: LOLoad Schema:
>>>>> 
>>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>>>> 
>>>>> #-----------------------------------------------
>>>>> # Physical Plan:
>>>>> #-----------------------------------------------
>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>> |
>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>   |   |
>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>>>   |   |
>>>>>   |   |---Project[bag][1] - scope-5
>>>>>   |
>>>>>   |---grpall: Package[tuple]{chararray} - scope-2
>>>>>       |
>>>>>       |---grpall: Global Rearrange[tuple] - scope-1
>>>>>           |
>>>>>           |---grpall: Local Rearrange[tuple]{chararray}(false) - scope-3
>>>>>               |   |
>>>>>               |   Constant(all) - scope-4
>>>>>               |
>>>>>               |---longDesc:
>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>>>> 
>>>>> 2012-07-09 15:47:02,441 [main] INFO
>>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
>>>>> File concatenation threshold: 100 optimistic? false
>>>>> 2012-07-09 15:47:02,448 [main] INFO
>>>>> 
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>>>> - Choosing to move algebraic foreach to combiner
>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>> 
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>> - MR plan size before optimization: 1
>>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>>> 
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>>> - MR plan size after optimization: 1
>>>>> #--------------------------------------------------
>>>>> # Map Reduce Plan
>>>>> #--------------------------------------------------
>>>>> MapReduce node scope-10
>>>>> Map Plan
>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>>>> |   |
>>>>> |   Project[chararray][0] - scope-23
>>>>> |
>>>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>>>   |   |
>>>>>   |   Project[chararray][0] - scope-12
>>>>>   |   |
>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
>>>>>   |   |
>>>>>   |   |---Project[bag][1] - scope-14
>>>>>   |
>>>>>   |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>>>       |
>>>>>       |---longDesc:
>>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0--------
>>>>> Combine Plan
>>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>>>> |   |
>>>>> |   Project[chararray][0] - scope-27
>>>>> |
>>>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>>>   |   |
>>>>>   |   Project[chararray][0] - scope-16
>>>>>   |   |
>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>>>>> scope-17
>>>>>   |   |
>>>>>   |   |---Project[bag][1] - scope-18
>>>>>   |
>>>>>   |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>>>> Reduce Plan
>>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>>> |
>>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>>   |   |
>>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>>>>>   |   |
>>>>>   |   |---Project[bag][1] - scope-19
>>>>>   |
>>>>>   |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>>>> Global sort: false
>>>>> ----------------
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney <jc...@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> instead of doing "dump relation," do "explain relation" (then run
>>>>>> identically) and paste the output here. It will show whether the
>>>> combiner
>>>>>> is being used,
>>>>>> 
>>>>>> 2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> As it was said, COUNT is algebraic and should be fast, because it
>>>>>>> forces combiner. You should make sure that combiner is really used
>>>>>>> here. It can be disabled in some situations. I've encountered such
>>>>>>> situations many times when a job is tooo heavy in case no combiner is
>>>>>>> applied.
>>>>>>> 
>>>>>>> Ruslan
>>>>>>> 
>>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com>
>>>>>> wrote:
>>>>>>>> Right!!
>>>>>>>> 
>>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>>>>>> 'group all'. How can that be confirmed?
>>>>>>>> 
>>>>>>>> On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
>>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and as such,
>>>>>>> will
>>>>>>>>> use combiners, so it is generally quite fast.
>>>>>>>>> 
>>>>>>>>> 2012/7/2 Subir S <su...@gmail.com>
>>>>>>>>> 
>>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per
>>>> group
>>>>>>>>>> and sum may be.
>>>>>>>>>> 
>>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>>>> 
>>>>>>>>>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
>>>>>>>>>>> Hi all,
>>>>>>>>>>> 
>>>>>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>>>>>> records.
>>>>>>>>>>> 
>>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>>>> 
>>>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>>>> dump cnt_filter;
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> but sometimes, when the records get larger, it takes lots of time
>>>>>> and
>>>>>>>>>> hang
>>>>>>>>>>> up, and or die.
>>>>>>>>>>> I thought counting should be simple enough, so what is the best way
>>>>>>> to
>>>>>>>>>> do a
>>>>>>>>>>> counting in pig?
>>>>>>>>>>> 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> 
>>>>>>>>>>> Sheng
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Ruslan Al-Fakikh
>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
> 


Re: What is the best way to do counting in pig?

Posted by Haitao Yao <ya...@gmail.com>.
I found the solution.

After analyzing the heap dump while the reducer OOM, I found out the memory is consumed by org.apache.pig.data.InternalCachedBag , here's the diagram: 


In the source code of org.apache.pig.data.InternalCachedBag, I found out there's a parameter for the cache limit: 
 public InternalCachedBag(int bagCount) {       
        float percent = 0.2F;
        
    	if (PigMapReduce.sJobConfInternal.get() != null) {
		// here, the cache limit is from here! 
    		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
    		if (usage != null) {
    			percent = Float.parseFloat(usage);
    		}
    	}

        init(bagCount, percent);
    }  
    private void init(int bagCount, float percent) {
    	factory = TupleFactory.getInstance();        
    	mContents = new ArrayList<Tuple>();             
             	 
    	long max = Runtime.getRuntime().maxMemory();
        maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
        cacheLimit = Integer.MAX_VALUE;
        
        // set limit to 0, if memusage is 0 or really really small.
        // then all tuples are put into disk
        if (maxMemUsage < 1) {
        	cacheLimit = 0;
        }
        log.warn("cacheLimit: " + this.cacheLimit);
        addDone = false;
    }

so, after write pig.cachedbag.memusage=0 into $PIG_HOME/conf/pig.properties, my job successes!

You can also set to an appropriate value to fully utilize your memory as a cache.

Hope this is useful for others.
Thanks.


Haitao Yao
yao.erix@gmail.com
weibo: @haitao_yao
Skype:  haitao.yao.final

在 2012-7-10,下午1:06, Haitao Yao 写道:

> my reducers get 512 MB, -Xms512M -Xmx512M. 
> The reducer does not get OOM when manually invoke spill in my case. 
> 
> Can you explain more about your solution? 
> And can your solution fit into 512MB reducer process?
> Thanks very much.
> 
> 
> 
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
> 
> 在 2012-7-10,下午12:26, Jonathan Coveney 写道:
> 
>> I have something in the mix that should reduce bag memory :) Question: how
>> much memory are your reducers getting? In my experience, you'll get OOM's
>> on spilling if you have allocated less than a gig to the JVM
>> 
>> 2012/7/9 Haitao Yao <ya...@gmail.com>
>> 
>>> I have encountered the similar problem.  And I got a OOM while running the
>>> reducer.
>>> I think the reason is the data bag generated after group all is too big to
>>> fit into the reducer's memory.
>>> 
>>> and I have written a new COUNT implementation with explicit invoke
>>> System.gc() and spill  after the COUNT function finish its job, but it
>>> still get OOM
>>> 
>>> here's the code of the new COUNT implementation:
>>>        @Override
>>>        public Long exec(Tuple input) throws IOException {
>>>                DataBag bag = (DataBag)input.get(0);
>>>                Long result = super.exec(input);
>>>                LOG.warn(" before spill data bag memory : " +
>>> Runtime.getRuntime().freeMemory());
>>>                bag.spill();
>>>                System.gc();
>>>                LOG.warn(" after spill data bag memory : " +
>>> Runtime.getRuntime().freeMemory());
>>>                LOG.warn("big bag size: " + bag.size() + ", hashcode: " +
>>> bag.hashCode());
>>>                return result;
>>>        }
>>> 
>>> 
>>> I think we have to redesign the data bag implementation with less memory
>>> consumed.
>>> 
>>> 
>>> 
>>> Haitao Yao
>>> yao.erix@gmail.com
>>> weibo: @haitao_yao
>>> Skype:  haitao.yao.final
>>> 
>>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>>> 
>>>> the pig script:
>>>> 
>>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>>> 
>>>> grpall = group longDesc all;
>>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>>> explain cnt;
>>>> 
>>>> 
>>>> the dump relation result:
>>>> 
>>>> #-----------------------------------------------
>>>> # New Logical Plan:
>>>> #-----------------------------------------------
>>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>>> |
>>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>>   |   |
>>>>   |   (Name: LOGenerate[false] Schema:
>>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>>   |   |   |
>>>>   |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid:
>>>> 65)
>>>>   |   |   |
>>>>   |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 Column:
>>>> (*))
>>>>   |   |
>>>>   |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>>> 
>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>>   |
>>>>   |---grpall: (Name: LOCogroup Schema:
>>>> 
>>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>>       |   |
>>>>       |   (Name: Constant Type: chararray Uid: 62)
>>>>       |
>>>>       |---longDesc: (Name: LOLoad Schema:
>>>> 
>>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>>> 
>>>> #-----------------------------------------------
>>>> # Physical Plan:
>>>> #-----------------------------------------------
>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>> |
>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>   |   |
>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>>   |   |
>>>>   |   |---Project[bag][1] - scope-5
>>>>   |
>>>>   |---grpall: Package[tuple]{chararray} - scope-2
>>>>       |
>>>>       |---grpall: Global Rearrange[tuple] - scope-1
>>>>           |
>>>>           |---grpall: Local Rearrange[tuple]{chararray}(false) - scope-3
>>>>               |   |
>>>>               |   Constant(all) - scope-4
>>>>               |
>>>>               |---longDesc:
>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>>> 
>>>> 2012-07-09 15:47:02,441 [main] INFO
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
>>>> File concatenation threshold: 100 optimistic? false
>>>> 2012-07-09 15:47:02,448 [main] INFO
>>>> 
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>>> - Choosing to move algebraic foreach to combiner
>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>> 
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>> - MR plan size before optimization: 1
>>>> 2012-07-09 15:47:02,581 [main] INFO
>>>> 
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>>> - MR plan size after optimization: 1
>>>> #--------------------------------------------------
>>>> # Map Reduce Plan
>>>> #--------------------------------------------------
>>>> MapReduce node scope-10
>>>> Map Plan
>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>>> |   |
>>>> |   Project[chararray][0] - scope-23
>>>> |
>>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>>   |   |
>>>>   |   Project[chararray][0] - scope-12
>>>>   |   |
>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
>>>>   |   |
>>>>   |   |---Project[bag][1] - scope-14
>>>>   |
>>>>   |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>>       |
>>>>       |---longDesc:
>>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0--------
>>>> Combine Plan
>>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>>> |   |
>>>> |   Project[chararray][0] - scope-27
>>>> |
>>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>>   |   |
>>>>   |   Project[chararray][0] - scope-16
>>>>   |   |
>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>>>> scope-17
>>>>   |   |
>>>>   |   |---Project[bag][1] - scope-18
>>>>   |
>>>>   |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>>> Reduce Plan
>>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>>> |
>>>> |---cnt: New For Each(false)[bag] - scope-8
>>>>   |   |
>>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>>>>   |   |
>>>>   |   |---Project[bag][1] - scope-19
>>>>   |
>>>>   |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>>> Global sort: false
>>>> ----------------
>>>> 
>>>> 
>>>> 
>>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney <jc...@gmail.com>
>>> wrote:
>>>> 
>>>>> instead of doing "dump relation," do "explain relation" (then run
>>>>> identically) and paste the output here. It will show whether the
>>> combiner
>>>>> is being used,
>>>>> 
>>>>> 2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> As it was said, COUNT is algebraic and should be fast, because it
>>>>>> forces combiner. You should make sure that combiner is really used
>>>>>> here. It can be disabled in some situations. I've encountered such
>>>>>> situations many times when a job is tooo heavy in case no combiner is
>>>>>> applied.
>>>>>> 
>>>>>> Ruslan
>>>>>> 
>>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com>
>>>>> wrote:
>>>>>>> Right!!
>>>>>>> 
>>>>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>>>>> 'group all'. How can that be confirmed?
>>>>>>> 
>>>>>>> On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
>>>>>>>> group all uses a single reducer, but COUNT is algebraic, and as such,
>>>>>> will
>>>>>>>> use combiners, so it is generally quite fast.
>>>>>>>> 
>>>>>>>> 2012/7/2 Subir S <su...@gmail.com>
>>>>>>>> 
>>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per
>>> group
>>>>>>>>> and sum may be.
>>>>>>>>> 
>>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>>> 
>>>>>>>>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
>>>>>>>>>> Hi all,
>>>>>>>>>> 
>>>>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>>>>> records.
>>>>>>>>>> 
>>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>>> 
>>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>>> dump cnt_filter;
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> but sometimes, when the records get larger, it takes lots of time
>>>>> and
>>>>>>>>> hang
>>>>>>>>>> up, and or die.
>>>>>>>>>> I thought counting should be simple enough, so what is the best way
>>>>>> to
>>>>>>>>> do a
>>>>>>>>>> counting in pig?
>>>>>>>>>> 
>>>>>>>>>> Thanks!
>>>>>>>>>> 
>>>>>>>>>> Sheng
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ruslan Al-Fakikh
>>>>>> 
>>>>> 
>>> 
>>> 
> 


Re: What is the best way to do counting in pig?

Posted by Haitao Yao <ya...@gmail.com>.
my reducers get 512 MB, -Xms512M -Xmx512M. 
The reducer does not get OOM when manually invoke spill in my case. 

Can you explain more about your solution? 
And can your solution fit into 512MB reducer process?
Thanks very much.



Haitao Yao
yao.erix@gmail.com
weibo: @haitao_yao
Skype:  haitao.yao.final

在 2012-7-10,下午12:26, Jonathan Coveney 写道:

> I have something in the mix that should reduce bag memory :) Question: how
> much memory are your reducers getting? In my experience, you'll get OOM's
> on spilling if you have allocated less than a gig to the JVM
> 
> 2012/7/9 Haitao Yao <ya...@gmail.com>
> 
>> I have encountered the similar problem.  And I got a OOM while running the
>> reducer.
>> I think the reason is the data bag generated after group all is too big to
>> fit into the reducer's memory.
>> 
>> and I have written a new COUNT implementation with explicit invoke
>> System.gc() and spill  after the COUNT function finish its job, but it
>> still get OOM
>> 
>> here's the code of the new COUNT implementation:
>>        @Override
>>        public Long exec(Tuple input) throws IOException {
>>                DataBag bag = (DataBag)input.get(0);
>>                Long result = super.exec(input);
>>                LOG.warn(" before spill data bag memory : " +
>> Runtime.getRuntime().freeMemory());
>>                bag.spill();
>>                System.gc();
>>                LOG.warn(" after spill data bag memory : " +
>> Runtime.getRuntime().freeMemory());
>>                LOG.warn("big bag size: " + bag.size() + ", hashcode: " +
>> bag.hashCode());
>>                return result;
>>        }
>> 
>> 
>> I think we have to redesign the data bag implementation with less memory
>> consumed.
>> 
>> 
>> 
>> Haitao Yao
>> yao.erix@gmail.com
>> weibo: @haitao_yao
>> Skype:  haitao.yao.final
>> 
>> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>> 
>>> the pig script:
>>> 
>>> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
>>> 
>>> grpall = group longDesc all;
>>> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
>>> explain cnt;
>>> 
>>> 
>>> the dump relation result:
>>> 
>>> #-----------------------------------------------
>>> # New Logical Plan:
>>> #-----------------------------------------------
>>> cnt: (Name: LOStore Schema: allNumber#65:long)
>>> |
>>> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>>>   |   |
>>>   |   (Name: LOGenerate[false] Schema:
>>> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>>>   |   |   |
>>>   |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid:
>>> 65)
>>>   |   |   |
>>>   |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 Column:
>>> (*))
>>>   |   |
>>>   |   |---longDesc: (Name: LOInnerLoad[1] Schema:
>>> 
>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>>>   |
>>>   |---grpall: (Name: LOCogroup Schema:
>>> 
>> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>>>       |   |
>>>       |   (Name: Constant Type: chararray Uid: 62)
>>>       |
>>>       |---longDesc: (Name: LOLoad Schema:
>>> 
>> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
>>> 
>>> #-----------------------------------------------
>>> # Physical Plan:
>>> #-----------------------------------------------
>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>> |
>>> |---cnt: New For Each(false)[bag] - scope-8
>>>   |   |
>>>   |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>>>   |   |
>>>   |   |---Project[bag][1] - scope-5
>>>   |
>>>   |---grpall: Package[tuple]{chararray} - scope-2
>>>       |
>>>       |---grpall: Global Rearrange[tuple] - scope-1
>>>           |
>>>           |---grpall: Local Rearrange[tuple]{chararray}(false) - scope-3
>>>               |   |
>>>               |   Constant(all) - scope-4
>>>               |
>>>               |---longDesc:
>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
>>> 
>>> 2012-07-09 15:47:02,441 [main] INFO
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
>>> File concatenation threshold: 100 optimistic? false
>>> 2012-07-09 15:47:02,448 [main] INFO
>>> 
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
>>> - Choosing to move algebraic foreach to combiner
>>> 2012-07-09 15:47:02,581 [main] INFO
>>> 
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>> - MR plan size before optimization: 1
>>> 2012-07-09 15:47:02,581 [main] INFO
>>> 
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
>>> - MR plan size after optimization: 1
>>> #--------------------------------------------------
>>> # Map Reduce Plan
>>> #--------------------------------------------------
>>> MapReduce node scope-10
>>> Map Plan
>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
>>> |   |
>>> |   Project[chararray][0] - scope-23
>>> |
>>> |---cnt: New For Each(false,false)[bag] - scope-11
>>>   |   |
>>>   |   Project[chararray][0] - scope-12
>>>   |   |
>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
>>>   |   |
>>>   |   |---Project[bag][1] - scope-14
>>>   |
>>>   |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>>>       |
>>>       |---longDesc:
>>> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0--------
>>> Combine Plan
>>> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
>>> |   |
>>> |   Project[chararray][0] - scope-27
>>> |
>>> |---cnt: New For Each(false,false)[bag] - scope-15
>>>   |   |
>>>   |   Project[chararray][0] - scope-16
>>>   |   |
>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
>>> scope-17
>>>   |   |
>>>   |   |---Project[bag][1] - scope-18
>>>   |
>>>   |---POCombinerPackage[tuple]{chararray} - scope-20--------
>>> Reduce Plan
>>> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
>>> |
>>> |---cnt: New For Each(false)[bag] - scope-8
>>>   |   |
>>>   |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>>>   |   |
>>>   |   |---Project[bag][1] - scope-19
>>>   |
>>>   |---POCombinerPackage[tuple]{chararray} - scope-28--------
>>> Global sort: false
>>> ----------------
>>> 
>>> 
>>> 
>>> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney <jc...@gmail.com>
>> wrote:
>>> 
>>>> instead of doing "dump relation," do "explain relation" (then run
>>>> identically) and paste the output here. It will show whether the
>> combiner
>>>> is being used,
>>>> 
>>>> 2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>
>>>> 
>>>>> Hi,
>>>>> 
>>>>> As it was said, COUNT is algebraic and should be fast, because it
>>>>> forces combiner. You should make sure that combiner is really used
>>>>> here. It can be disabled in some situations. I've encountered such
>>>>> situations many times when a job is tooo heavy in case no combiner is
>>>>> applied.
>>>>> 
>>>>> Ruslan
>>>>> 
>>>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com>
>>>> wrote:
>>>>>> Right!!
>>>>>> 
>>>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>>>> 'group all'. How can that be confirmed?
>>>>>> 
>>>>>> On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
>>>>>>> group all uses a single reducer, but COUNT is algebraic, and as such,
>>>>> will
>>>>>>> use combiners, so it is generally quite fast.
>>>>>>> 
>>>>>>> 2012/7/2 Subir S <su...@gmail.com>
>>>>>>> 
>>>>>>>> Group all - uses single reducer AFAIU. You can try to count per
>> group
>>>>>>>> and sum may be.
>>>>>>>> 
>>>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>>>> 
>>>>>>>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
>>>>>>>>> Hi all,
>>>>>>>>> 
>>>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>>>> records.
>>>>>>>>> 
>>>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>>>> grpd = group m_skill_group all;
>>>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>>>> 
>>>>>>>>> cnt_filter = limit cnt 10;
>>>>>>>>> dump cnt_filter;
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> but sometimes, when the records get larger, it takes lots of time
>>>> and
>>>>>>>> hang
>>>>>>>>> up, and or die.
>>>>>>>>> I thought counting should be simple enough, so what is the best way
>>>>> to
>>>>>>>> do a
>>>>>>>>> counting in pig?
>>>>>>>>> 
>>>>>>>>> Thanks!
>>>>>>>>> 
>>>>>>>>> Sheng
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Best Regards,
>>>>> Ruslan Al-Fakikh
>>>>> 
>>>> 
>> 
>> 


Re: What is the best way to do counting in pig?

Posted by Jonathan Coveney <jc...@gmail.com>.
I have something in the mix that should reduce bag memory :) Question: how
much memory are your reducers getting? In my experience, you'll get OOM's
on spilling if you have allocated less than a gig to the JVM

2012/7/9 Haitao Yao <ya...@gmail.com>

> I have encountered the similar problem.  And I got a OOM while running the
> reducer.
> I think the reason is the data bag generated after group all is too big to
> fit into the reducer's memory.
>
> and I have written a new COUNT implementation with explicit invoke
> System.gc() and spill  after the COUNT function finish its job, but it
> still get OOM
>
> here's the code of the new COUNT implementation:
>         @Override
>         public Long exec(Tuple input) throws IOException {
>                 DataBag bag = (DataBag)input.get(0);
>                 Long result = super.exec(input);
>                 LOG.warn(" before spill data bag memory : " +
> Runtime.getRuntime().freeMemory());
>                 bag.spill();
>                 System.gc();
>                 LOG.warn(" after spill data bag memory : " +
> Runtime.getRuntime().freeMemory());
>                 LOG.warn("big bag size: " + bag.size() + ", hashcode: " +
> bag.hashCode());
>                 return result;
>         }
>
>
> I think we have to redesign the data bag implementation with less memory
> consumed.
>
>
>
> Haitao Yao
> yao.erix@gmail.com
> weibo: @haitao_yao
> Skype:  haitao.yao.final
>
> 在 2012-7-10,上午6:54, Sheng Guo 写道:
>
> > the pig script:
> >
> > longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
> >
> > grpall = group longDesc all;
> > cnt = foreach grpall generate COUNT(longDesc) as allNumber;
> > explain cnt;
> >
> >
> > the dump relation result:
> >
> > #-----------------------------------------------
> > # New Logical Plan:
> > #-----------------------------------------------
> > cnt: (Name: LOStore Schema: allNumber#65:long)
> > |
> > |---cnt: (Name: LOForEach Schema: allNumber#65:long)
> >    |   |
> >    |   (Name: LOGenerate[false] Schema:
> > allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
> >    |   |   |
> >    |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid:
> > 65)
> >    |   |   |
> >    |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 Column:
> > (*))
> >    |   |
> >    |   |---longDesc: (Name: LOInnerLoad[1] Schema:
> >
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
> >    |
> >    |---grpall: (Name: LOCogroup Schema:
> >
> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
> >        |   |
> >        |   (Name: Constant Type: chararray Uid: 62)
> >        |
> >        |---longDesc: (Name: LOLoad Schema:
> >
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
> >
> > #-----------------------------------------------
> > # Physical Plan:
> > #-----------------------------------------------
> > cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
> > |
> > |---cnt: New For Each(false)[bag] - scope-8
> >    |   |
> >    |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
> >    |   |
> >    |   |---Project[bag][1] - scope-5
> >    |
> >    |---grpall: Package[tuple]{chararray} - scope-2
> >        |
> >        |---grpall: Global Rearrange[tuple] - scope-1
> >            |
> >            |---grpall: Local Rearrange[tuple]{chararray}(false) - scope-3
> >                |   |
> >                |   Constant(all) - scope-4
> >                |
> >                |---longDesc:
> > Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
> >
> > 2012-07-09 15:47:02,441 [main] INFO
> > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
> > File concatenation threshold: 100 optimistic? false
> > 2012-07-09 15:47:02,448 [main] INFO
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
> > - Choosing to move algebraic foreach to combiner
> > 2012-07-09 15:47:02,581 [main] INFO
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> > - MR plan size before optimization: 1
> > 2012-07-09 15:47:02,581 [main] INFO
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> > - MR plan size after optimization: 1
> > #--------------------------------------------------
> > # Map Reduce Plan
> > #--------------------------------------------------
> > MapReduce node scope-10
> > Map Plan
> > grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
> > |   |
> > |   Project[chararray][0] - scope-23
> > |
> > |---cnt: New For Each(false,false)[bag] - scope-11
> >    |   |
> >    |   Project[chararray][0] - scope-12
> >    |   |
> >    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
> >    |   |
> >    |   |---Project[bag][1] - scope-14
> >    |
> >    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
> >        |
> >        |---longDesc:
> > Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0--------
> > Combine Plan
> > grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
> > |   |
> > |   Project[chararray][0] - scope-27
> > |
> > |---cnt: New For Each(false,false)[bag] - scope-15
> >    |   |
> >    |   Project[chararray][0] - scope-16
> >    |   |
> >    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
> > scope-17
> >    |   |
> >    |   |---Project[bag][1] - scope-18
> >    |
> >    |---POCombinerPackage[tuple]{chararray} - scope-20--------
> > Reduce Plan
> > cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
> > |
> > |---cnt: New For Each(false)[bag] - scope-8
> >    |   |
> >    |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
> >    |   |
> >    |   |---Project[bag][1] - scope-19
> >    |
> >    |---POCombinerPackage[tuple]{chararray} - scope-28--------
> > Global sort: false
> > ----------------
> >
> >
> >
> > On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney <jc...@gmail.com>
> wrote:
> >
> >> instead of doing "dump relation," do "explain relation" (then run
> >> identically) and paste the output here. It will show whether the
> combiner
> >> is being used,
> >>
> >> 2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>
> >>
> >>> Hi,
> >>>
> >>> As it was said, COUNT is algebraic and should be fast, because it
> >>> forces combiner. You should make sure that combiner is really used
> >>> here. It can be disabled in some situations. I've encountered such
> >>> situations many times when a job is tooo heavy in case no combiner is
> >>> applied.
> >>>
> >>> Ruslan
> >>>
> >>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com>
> >> wrote:
> >>>> Right!!
> >>>>
> >>>> Since it is mentioned that job is hanging, wild guess is it must be
> >>>> 'group all'. How can that be confirmed?
> >>>>
> >>>> On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
> >>>>> group all uses a single reducer, but COUNT is algebraic, and as such,
> >>> will
> >>>>> use combiners, so it is generally quite fast.
> >>>>>
> >>>>> 2012/7/2 Subir S <su...@gmail.com>
> >>>>>
> >>>>>> Group all - uses single reducer AFAIU. You can try to count per
> group
> >>>>>> and sum may be.
> >>>>>>
> >>>>>> You may also try with COUNT_STAR to include NULL fields.
> >>>>>>
> >>>>>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I used to use the following pig script to do the counting of the
> >>>>>>> records.
> >>>>>>>
> >>>>>>> m_skill_group = group m_skills_filter by member_id;
> >>>>>>> grpd = group m_skill_group all;
> >>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
> >>>>>>>
> >>>>>>> cnt_filter = limit cnt 10;
> >>>>>>> dump cnt_filter;
> >>>>>>>
> >>>>>>>
> >>>>>>> but sometimes, when the records get larger, it takes lots of time
> >> and
> >>>>>> hang
> >>>>>>> up, and or die.
> >>>>>>> I thought counting should be simple enough, so what is the best way
> >>> to
> >>>>>> do a
> >>>>>>> counting in pig?
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>>
> >>>>>>> Sheng
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
> >>>
> >>> --
> >>> Best Regards,
> >>> Ruslan Al-Fakikh
> >>>
> >>
>
>

Re: What is the best way to do counting in pig?

Posted by Haitao Yao <ya...@gmail.com>.
I have encountered the similar problem.  And I got a OOM while running the reducer.
I think the reason is the data bag generated after group all is too big to fit into the reducer's memory.

and I have written a new COUNT implementation with explicit invoke System.gc() and spill  after the COUNT function finish its job, but it still get OOM

here's the code of the new COUNT implementation:
	@Override
	public Long exec(Tuple input) throws IOException {
		DataBag bag = (DataBag)input.get(0);
		Long result = super.exec(input);
		LOG.warn(" before spill data bag memory : " + Runtime.getRuntime().freeMemory());
		bag.spill();
		System.gc();
		LOG.warn(" after spill data bag memory : " + Runtime.getRuntime().freeMemory());
		LOG.warn("big bag size: " + bag.size() + ", hashcode: " + bag.hashCode());
		return result;
	}


I think we have to redesign the data bag implementation with less memory consumed.



Haitao Yao
yao.erix@gmail.com
weibo: @haitao_yao
Skype:  haitao.yao.final

在 2012-7-10,上午6:54, Sheng Guo 写道:

> the pig script:
> 
> longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();
> 
> grpall = group longDesc all;
> cnt = foreach grpall generate COUNT(longDesc) as allNumber;
> explain cnt;
> 
> 
> the dump relation result:
> 
> #-----------------------------------------------
> # New Logical Plan:
> #-----------------------------------------------
> cnt: (Name: LOStore Schema: allNumber#65:long)
> |
> |---cnt: (Name: LOForEach Schema: allNumber#65:long)
>    |   |
>    |   (Name: LOGenerate[false] Schema:
> allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
>    |   |   |
>    |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid:
> 65)
>    |   |   |
>    |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 Column:
> (*))
>    |   |
>    |   |---longDesc: (Name: LOInnerLoad[1] Schema:
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
>    |
>    |---grpall: (Name: LOCogroup Schema:
> group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
>        |   |
>        |   (Name: Constant Type: chararray Uid: 62)
>        |
>        |---longDesc: (Name: LOLoad Schema:
> DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null
> 
> #-----------------------------------------------
> # Physical Plan:
> #-----------------------------------------------
> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
> |
> |---cnt: New For Each(false)[bag] - scope-8
>    |   |
>    |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
>    |   |
>    |   |---Project[bag][1] - scope-5
>    |
>    |---grpall: Package[tuple]{chararray} - scope-2
>        |
>        |---grpall: Global Rearrange[tuple] - scope-1
>            |
>            |---grpall: Local Rearrange[tuple]{chararray}(false) - scope-3
>                |   |
>                |   Constant(all) - scope-4
>                |
>                |---longDesc:
> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0
> 
> 2012-07-09 15:47:02,441 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
> File concatenation threshold: 100 optimistic? false
> 2012-07-09 15:47:02,448 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
> - Choosing to move algebraic foreach to combiner
> 2012-07-09 15:47:02,581 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> - MR plan size before optimization: 1
> 2012-07-09 15:47:02,581 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> - MR plan size after optimization: 1
> #--------------------------------------------------
> # Map Reduce Plan
> #--------------------------------------------------
> MapReduce node scope-10
> Map Plan
> grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
> |   |
> |   Project[chararray][0] - scope-23
> |
> |---cnt: New For Each(false,false)[bag] - scope-11
>    |   |
>    |   Project[chararray][0] - scope-12
>    |   |
>    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
>    |   |
>    |   |---Project[bag][1] - scope-14
>    |
>    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
>        |
>        |---longDesc:
> Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0--------
> Combine Plan
> grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
> |   |
> |   Project[chararray][0] - scope-27
> |
> |---cnt: New For Each(false,false)[bag] - scope-15
>    |   |
>    |   Project[chararray][0] - scope-16
>    |   |
>    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
> scope-17
>    |   |
>    |   |---Project[bag][1] - scope-18
>    |
>    |---POCombinerPackage[tuple]{chararray} - scope-20--------
> Reduce Plan
> cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
> |
> |---cnt: New For Each(false)[bag] - scope-8
>    |   |
>    |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
>    |   |
>    |   |---Project[bag][1] - scope-19
>    |
>    |---POCombinerPackage[tuple]{chararray} - scope-28--------
> Global sort: false
> ----------------
> 
> 
> 
> On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney <jc...@gmail.com> wrote:
> 
>> instead of doing "dump relation," do "explain relation" (then run
>> identically) and paste the output here. It will show whether the combiner
>> is being used,
>> 
>> 2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>
>> 
>>> Hi,
>>> 
>>> As it was said, COUNT is algebraic and should be fast, because it
>>> forces combiner. You should make sure that combiner is really used
>>> here. It can be disabled in some situations. I've encountered such
>>> situations many times when a job is tooo heavy in case no combiner is
>>> applied.
>>> 
>>> Ruslan
>>> 
>>> On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com>
>> wrote:
>>>> Right!!
>>>> 
>>>> Since it is mentioned that job is hanging, wild guess is it must be
>>>> 'group all'. How can that be confirmed?
>>>> 
>>>> On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
>>>>> group all uses a single reducer, but COUNT is algebraic, and as such,
>>> will
>>>>> use combiners, so it is generally quite fast.
>>>>> 
>>>>> 2012/7/2 Subir S <su...@gmail.com>
>>>>> 
>>>>>> Group all - uses single reducer AFAIU. You can try to count per group
>>>>>> and sum may be.
>>>>>> 
>>>>>> You may also try with COUNT_STAR to include NULL fields.
>>>>>> 
>>>>>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> I used to use the following pig script to do the counting of the
>>>>>>> records.
>>>>>>> 
>>>>>>> m_skill_group = group m_skills_filter by member_id;
>>>>>>> grpd = group m_skill_group all;
>>>>>>> cnt = foreach grpd generate COUNT(m_skill_group);
>>>>>>> 
>>>>>>> cnt_filter = limit cnt 10;
>>>>>>> dump cnt_filter;
>>>>>>> 
>>>>>>> 
>>>>>>> but sometimes, when the records get larger, it takes lots of time
>> and
>>>>>> hang
>>>>>>> up, and or die.
>>>>>>> I thought counting should be simple enough, so what is the best way
>>> to
>>>>>> do a
>>>>>>> counting in pig?
>>>>>>> 
>>>>>>> Thanks!
>>>>>>> 
>>>>>>> Sheng
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>>> 
>>> --
>>> Best Regards,
>>> Ruslan Al-Fakikh
>>> 
>> 


Re: What is the best way to do counting in pig?

Posted by Sheng Guo <en...@gmail.com>.
the pig script:

longDesc = load '/user/xx/filtered_chunk' USING AvroStorage();

grpall = group longDesc all;
cnt = foreach grpall generate COUNT(longDesc) as allNumber;
explain cnt;


the dump relation result:

#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
cnt: (Name: LOStore Schema: allNumber#65:long)
|
|---cnt: (Name: LOForEach Schema: allNumber#65:long)
    |   |
    |   (Name: LOGenerate[false] Schema:
allNumber#65:long)ColumnPrune:InputUids=[63]ColumnPrune:OutputUids=[65]
    |   |   |
    |   |   (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid:
65)
    |   |   |
    |   |   |---longDesc:(Name: Project Type: bag Uid: 63 Input: 0 Column:
(*))
    |   |
    |   |---longDesc: (Name: LOInnerLoad[1] Schema:
DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)
    |
    |---grpall: (Name: LOCogroup Schema:
group#62:chararray,longDesc#63:bag{#64:tuple(DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)})
        |   |
        |   (Name: Constant Type: chararray Uid: 62)
        |
        |---longDesc: (Name: LOLoad Schema:
DISCUSSION_ID#41:long,COMMENT_COUNT#42:long,UNIQUE_COMMENTER_COUNT#43:long,ACTIVE_COMMENT_COUNT#44:long,LAST_ACTIVITY_AT#45:long,SUBJECT#46:chararray,SUBJECT_CHUNKS#47:chararray,LOCALE#48:chararray,STATE#49:chararray,DETAIL#50:chararray,DETAIL_CHUNKS#51:chararray,TOPIC_TITLE#52:chararray,TOPIC_TITLE_CHUNKS#53:chararray,TOPIC_DESCRIPTION#54:chararray,TOPIC_DESCRIPTION_CHUNKS#55:chararray,TOPIC_ATTRIBUTES#56:chararray)RequiredFields:null

#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
|
|---cnt: New For Each(false)[bag] - scope-8
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-6
    |   |
    |   |---Project[bag][1] - scope-5
    |
    |---grpall: Package[tuple]{chararray} - scope-2
        |
        |---grpall: Global Rearrange[tuple] - scope-1
            |
            |---grpall: Local Rearrange[tuple]{chararray}(false) - scope-3
                |   |
                |   Constant(all) - scope-4
                |
                |---longDesc:
Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0

2012-07-09 15:47:02,441 [main] INFO
 org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
File concatenation threshold: 100 optimistic? false
2012-07-09 15:47:02,448 [main] INFO
 org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer
- Choosing to move algebraic foreach to combiner
2012-07-09 15:47:02,581 [main] INFO
 org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size before optimization: 1
2012-07-09 15:47:02,581 [main] INFO
 org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size after optimization: 1
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-10
Map Plan
grpall: Local Rearrange[tuple]{chararray}(false) - scope-22
|   |
|   Project[chararray][0] - scope-23
|
|---cnt: New For Each(false,false)[bag] - scope-11
    |   |
    |   Project[chararray][0] - scope-12
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
    |   |
    |   |---Project[bag][1] - scope-14
    |
    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-24
        |
        |---longDesc:
Load(/user/sguo/h2o/group_filtered_chunk:LiAvroStorage) - scope-0--------
Combine Plan
grpall: Local Rearrange[tuple]{chararray}(false) - scope-26
|   |
|   Project[chararray][0] - scope-27
|
|---cnt: New For Each(false,false)[bag] - scope-15
    |   |
    |   Project[chararray][0] - scope-16
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] -
scope-17
    |   |
    |   |---Project[bag][1] - scope-18
    |
    |---POCombinerPackage[tuple]{chararray} - scope-20--------
Reduce Plan
cnt: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-9
|
|---cnt: New For Each(false)[bag] - scope-8
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
    |   |
    |   |---Project[bag][1] - scope-19
    |
    |---POCombinerPackage[tuple]{chararray} - scope-28--------
Global sort: false
----------------



On Tue, Jul 3, 2012 at 9:56 AM, Jonathan Coveney <jc...@gmail.com> wrote:

> instead of doing "dump relation," do "explain relation" (then run
> identically) and paste the output here. It will show whether the combiner
> is being used,
>
> 2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>
>
> > Hi,
> >
> > As it was said, COUNT is algebraic and should be fast, because it
> > forces combiner. You should make sure that combiner is really used
> > here. It can be disabled in some situations. I've encountered such
> > situations many times when a job is tooo heavy in case no combiner is
> > applied.
> >
> > Ruslan
> >
> > On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com>
> wrote:
> > > Right!!
> > >
> > > Since it is mentioned that job is hanging, wild guess is it must be
> > > 'group all'. How can that be confirmed?
> > >
> > > On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
> > >> group all uses a single reducer, but COUNT is algebraic, and as such,
> > will
> > >> use combiners, so it is generally quite fast.
> > >>
> > >> 2012/7/2 Subir S <su...@gmail.com>
> > >>
> > >>> Group all - uses single reducer AFAIU. You can try to count per group
> > >>> and sum may be.
> > >>>
> > >>> You may also try with COUNT_STAR to include NULL fields.
> > >>>
> > >>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
> > >>> > Hi all,
> > >>> >
> > >>> > I used to use the following pig script to do the counting of the
> > >>> > records.
> > >>> >
> > >>> > m_skill_group = group m_skills_filter by member_id;
> > >>> > grpd = group m_skill_group all;
> > >>> > cnt = foreach grpd generate COUNT(m_skill_group);
> > >>> >
> > >>> > cnt_filter = limit cnt 10;
> > >>> > dump cnt_filter;
> > >>> >
> > >>> >
> > >>> > but sometimes, when the records get larger, it takes lots of time
> and
> > >>> hang
> > >>> > up, and or die.
> > >>> > I thought counting should be simple enough, so what is the best way
> > to
> > >>> do a
> > >>> > counting in pig?
> > >>> >
> > >>> > Thanks!
> > >>> >
> > >>> > Sheng
> > >>> >
> > >>>
> > >>
> >
> >
> >
> > --
> > Best Regards,
> > Ruslan Al-Fakikh
> >
>

Re: What is the best way to do counting in pig?

Posted by Jonathan Coveney <jc...@gmail.com>.
instead of doing "dump relation," do "explain relation" (then run
identically) and paste the output here. It will show whether the combiner
is being used,

2012/7/3 Ruslan Al-Fakikh <ru...@jalent.ru>

> Hi,
>
> As it was said, COUNT is algebraic and should be fast, because it
> forces combiner. You should make sure that combiner is really used
> here. It can be disabled in some situations. I've encountered such
> situations many times when a job is tooo heavy in case no combiner is
> applied.
>
> Ruslan
>
> On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com> wrote:
> > Right!!
> >
> > Since it is mentioned that job is hanging, wild guess is it must be
> > 'group all'. How can that be confirmed?
> >
> > On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
> >> group all uses a single reducer, but COUNT is algebraic, and as such,
> will
> >> use combiners, so it is generally quite fast.
> >>
> >> 2012/7/2 Subir S <su...@gmail.com>
> >>
> >>> Group all - uses single reducer AFAIU. You can try to count per group
> >>> and sum may be.
> >>>
> >>> You may also try with COUNT_STAR to include NULL fields.
> >>>
> >>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
> >>> > Hi all,
> >>> >
> >>> > I used to use the following pig script to do the counting of the
> >>> > records.
> >>> >
> >>> > m_skill_group = group m_skills_filter by member_id;
> >>> > grpd = group m_skill_group all;
> >>> > cnt = foreach grpd generate COUNT(m_skill_group);
> >>> >
> >>> > cnt_filter = limit cnt 10;
> >>> > dump cnt_filter;
> >>> >
> >>> >
> >>> > but sometimes, when the records get larger, it takes lots of time and
> >>> hang
> >>> > up, and or die.
> >>> > I thought counting should be simple enough, so what is the best way
> to
> >>> do a
> >>> > counting in pig?
> >>> >
> >>> > Thanks!
> >>> >
> >>> > Sheng
> >>> >
> >>>
> >>
>
>
>
> --
> Best Regards,
> Ruslan Al-Fakikh
>

Re: What is the best way to do counting in pig?

Posted by Ruslan Al-Fakikh <ru...@jalent.ru>.
Hi,

As it was said, COUNT is algebraic and should be fast, because it
forces combiner. You should make sure that combiner is really used
here. It can be disabled in some situations. I've encountered such
situations many times when a job is tooo heavy in case no combiner is
applied.

Ruslan

On Tue, Jul 3, 2012 at 1:35 AM, Subir S <su...@gmail.com> wrote:
> Right!!
>
> Since it is mentioned that job is hanging, wild guess is it must be
> 'group all'. How can that be confirmed?
>
> On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
>> group all uses a single reducer, but COUNT is algebraic, and as such, will
>> use combiners, so it is generally quite fast.
>>
>> 2012/7/2 Subir S <su...@gmail.com>
>>
>>> Group all - uses single reducer AFAIU. You can try to count per group
>>> and sum may be.
>>>
>>> You may also try with COUNT_STAR to include NULL fields.
>>>
>>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
>>> > Hi all,
>>> >
>>> > I used to use the following pig script to do the counting of the
>>> > records.
>>> >
>>> > m_skill_group = group m_skills_filter by member_id;
>>> > grpd = group m_skill_group all;
>>> > cnt = foreach grpd generate COUNT(m_skill_group);
>>> >
>>> > cnt_filter = limit cnt 10;
>>> > dump cnt_filter;
>>> >
>>> >
>>> > but sometimes, when the records get larger, it takes lots of time and
>>> hang
>>> > up, and or die.
>>> > I thought counting should be simple enough, so what is the best way to
>>> do a
>>> > counting in pig?
>>> >
>>> > Thanks!
>>> >
>>> > Sheng
>>> >
>>>
>>



-- 
Best Regards,
Ruslan Al-Fakikh

Re: What is the best way to do counting in pig?

Posted by Subir S <su...@gmail.com>.
Right!!

Since it is mentioned that job is hanging, wild guess is it must be
'group all'. How can that be confirmed?

On 7/3/12, Jonathan Coveney <jc...@gmail.com> wrote:
> group all uses a single reducer, but COUNT is algebraic, and as such, will
> use combiners, so it is generally quite fast.
>
> 2012/7/2 Subir S <su...@gmail.com>
>
>> Group all - uses single reducer AFAIU. You can try to count per group
>> and sum may be.
>>
>> You may also try with COUNT_STAR to include NULL fields.
>>
>> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
>> > Hi all,
>> >
>> > I used to use the following pig script to do the counting of the
>> > records.
>> >
>> > m_skill_group = group m_skills_filter by member_id;
>> > grpd = group m_skill_group all;
>> > cnt = foreach grpd generate COUNT(m_skill_group);
>> >
>> > cnt_filter = limit cnt 10;
>> > dump cnt_filter;
>> >
>> >
>> > but sometimes, when the records get larger, it takes lots of time and
>> hang
>> > up, and or die.
>> > I thought counting should be simple enough, so what is the best way to
>> do a
>> > counting in pig?
>> >
>> > Thanks!
>> >
>> > Sheng
>> >
>>
>

Re: What is the best way to do counting in pig?

Posted by Jonathan Coveney <jc...@gmail.com>.
group all uses a single reducer, but COUNT is algebraic, and as such, will
use combiners, so it is generally quite fast.

2012/7/2 Subir S <su...@gmail.com>

> Group all - uses single reducer AFAIU. You can try to count per group
> and sum may be.
>
> You may also try with COUNT_STAR to include NULL fields.
>
> On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
> > Hi all,
> >
> > I used to use the following pig script to do the counting of the records.
> >
> > m_skill_group = group m_skills_filter by member_id;
> > grpd = group m_skill_group all;
> > cnt = foreach grpd generate COUNT(m_skill_group);
> >
> > cnt_filter = limit cnt 10;
> > dump cnt_filter;
> >
> >
> > but sometimes, when the records get larger, it takes lots of time and
> hang
> > up, and or die.
> > I thought counting should be simple enough, so what is the best way to
> do a
> > counting in pig?
> >
> > Thanks!
> >
> > Sheng
> >
>

Re: What is the best way to do counting in pig?

Posted by Subir S <su...@gmail.com>.
Group all - uses single reducer AFAIU. You can try to count per group
and sum may be.

You may also try with COUNT_STAR to include NULL fields.

On 7/3/12, Sheng Guo <en...@gmail.com> wrote:
> Hi all,
>
> I used to use the following pig script to do the counting of the records.
>
> m_skill_group = group m_skills_filter by member_id;
> grpd = group m_skill_group all;
> cnt = foreach grpd generate COUNT(m_skill_group);
>
> cnt_filter = limit cnt 10;
> dump cnt_filter;
>
>
> but sometimes, when the records get larger, it takes lots of time and hang
> up, and or die.
> I thought counting should be simple enough, so what is the best way to do a
> counting in pig?
>
> Thanks!
>
> Sheng
>