You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Shubham Chopra <sh...@gmail.com> on 2011/06/16 20:13:08 UTC

Accumulative reducer v/s Combiner usage

Hi,

My pig query is roughly the following:

register some_lib.jar
a = load 'somefile' using CustomUDF();
b = foreach a generate CustomProjectionUDF();
c = foreach b generate var1, var2, var3;
d = group b by (var1, var2);
e = foreach d generate flatten(group), SUM(c.var1), SUM(c.var2),
SUM(c.var3);
store e into 'file';

I was expecting to see the combiner being used, but the optimizer did not
use a combiner. The following is the output I see (version 0.8.1)
INFO executionengine.HExecutionEngine: pig.usenewlogicalplan is set to true.
New logical plan will be used.
INFO executionengine.HExecutionEngine: (Name: agg:
Store(hdfs://machine:9000/SomeFile:PigStorage('|')) - scope-4353 Operator
Key: scope-4353)
INFO mapReduceLayer.MRCompiler: File concatenation threshold: 100
optimistic? false
INFO mapReduceLayer.MultiQueryOptimizer: MR plan size before optimization: 1
INFO mapReduceLayer.MultiQueryOptimizer: MR plan size after optimization: 1
INFO mapReduceLayer.AccumulatorOptimizer: Reducer is to run in accumulative
mode.
INFO pigstats.ScriptState: Pig script settings are added to the job
INFO mapReduceLayer.JobControlCompiler: BytesPerReducer=1000000000
maxReducers=999 totalInputFileSize=611579950
INFO mapReduceLayer.JobControlCompiler: Neither PARALLEL nor default
parallelism is set for this job. Setting number of reducers to 1
INFO mapReduceLayer.MapReduceLauncher: 1 map-reduce job(s) waiting for
submission.

How can I enforce the use of combiner here?

Thanks,
Shubham.

Re: Accumulative reducer v/s Combiner usage

Posted by Shubham Chopra <sh...@gmail.com>.
Thanks! I guess its time to move to the trunk then!

~Shubham.

On Thu, Jun 16, 2011 at 3:13 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> I've confirmed this behavior in 8.1 and the fact that it's fixed in
> trunk (didn't check 9).
>
>
> On Thu, Jun 16, 2011 at 12:00 PM, Shubham Chopra
> <sh...@gmail.com> wrote:
> > Hi Daniel,
> >
> > I am seeing this behaviour with 0.8.1.
> >
> > Consider the an input file named a containing the following:
> > 1|2|3
> > 3||4
> >
> > I start pig in the local mode and then use the following script:
> > a = load 'a' using PigStorage('|');
> > b = group a by $0;
> > c = foreach b generate 'Test' as name, flatten(group), SUM(a.$0) as s0,
> > SUM(a.$1) as s1, SUM(a.$2) as s2;
> > dump c;
> >
> > The above script does not use the combiner.
> >
> > However, the following script does:
> > a = load 'a' using PigStorage('|');
> > b = group a by $0;
> > c = foreach b generate flatten(group), SUM(a.$0) as s0, SUM(a.$1) as s1,
> > SUM(a.$2) as s2;
> > dump c;
> >
> > This script uses the combiner.
> >
> > I pinpointed the difference to using or not using a constant in the
> foreach
> > statement. Is this an expected behavior? I was thinking the decision to
> use
> > a combiner depends on UDFs implementing the algebraic interface. Why is
> the
> > constant projection stopping the combiner from being used?
> >
> > Thanks,
> > Shubham.
> >
> > On Thu, Jun 16, 2011 at 2:38 PM, Daniel Dai <ji...@yahoo-inc.com>
> wrote:
> >
> >> Do you mean "d = group c by (var1, var2); "? If so, I can see the
> combiner
> >> being used. Which version of Pig are you using?
> >>
> >> Daniel
> >>
> >>
> >> On 06/16/2011 11:13 AM, Shubham Chopra wrote:
> >>
> >>> Hi,
> >>>
> >>> My pig query is roughly the following:
> >>>
> >>> register some_lib.jar
> >>> a = load 'somefile' using CustomUDF();
> >>> b = foreach a generate CustomProjectionUDF();
> >>> c = foreach b generate var1, var2, var3;
> >>> d = group b by (var1, var2);
> >>> e = foreach d generate flatten(group), SUM(c.var1), SUM(c.var2),
> >>> SUM(c.var3);
> >>> store e into 'file';
> >>>
> >>> I was expecting to see the combiner being used, but the optimizer did
> not
> >>> use a combiner. The following is the output I see (version 0.8.1)
> >>> INFO executionengine.**HExecutionEngine: pig.usenewlogicalplan is set
> to
> >>> true.
> >>> New logical plan will be used.
> >>> INFO executionengine.**HExecutionEngine: (Name: agg:
> >>> Store(hdfs://machine:9000/**SomeFile:PigStorage('|')) - scope-4353
> >>> Operator
> >>> Key: scope-4353)
> >>> INFO mapReduceLayer.MRCompiler: File concatenation threshold: 100
> >>> optimistic? false
> >>> INFO mapReduceLayer.**MultiQueryOptimizer: MR plan size before
> >>> optimization: 1
> >>> INFO mapReduceLayer.**MultiQueryOptimizer: MR plan size after
> >>> optimization: 1
> >>> INFO mapReduceLayer.**AccumulatorOptimizer: Reducer is to run in
> >>> accumulative
> >>> mode.
> >>> INFO pigstats.ScriptState: Pig script settings are added to the job
> >>> INFO mapReduceLayer.**JobControlCompiler: BytesPerReducer=1000000000
> >>> maxReducers=999 totalInputFileSize=611579950
> >>> INFO mapReduceLayer.**JobControlCompiler: Neither PARALLEL nor default
> >>> parallelism is set for this job. Setting number of reducers to 1
> >>> INFO mapReduceLayer.**MapReduceLauncher: 1 map-reduce job(s) waiting
> for
> >>> submission.
> >>>
> >>> How can I enforce the use of combiner here?
> >>>
> >>> Thanks,
> >>> Shubham.
> >>>
> >>
> >>
> >
>

Re: Accumulative reducer v/s Combiner usage

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
I've confirmed this behavior in 8.1 and the fact that it's fixed in
trunk (didn't check 9).


On Thu, Jun 16, 2011 at 12:00 PM, Shubham Chopra
<sh...@gmail.com> wrote:
> Hi Daniel,
>
> I am seeing this behaviour with 0.8.1.
>
> Consider the an input file named a containing the following:
> 1|2|3
> 3||4
>
> I start pig in the local mode and then use the following script:
> a = load 'a' using PigStorage('|');
> b = group a by $0;
> c = foreach b generate 'Test' as name, flatten(group), SUM(a.$0) as s0,
> SUM(a.$1) as s1, SUM(a.$2) as s2;
> dump c;
>
> The above script does not use the combiner.
>
> However, the following script does:
> a = load 'a' using PigStorage('|');
> b = group a by $0;
> c = foreach b generate flatten(group), SUM(a.$0) as s0, SUM(a.$1) as s1,
> SUM(a.$2) as s2;
> dump c;
>
> This script uses the combiner.
>
> I pinpointed the difference to using or not using a constant in the foreach
> statement. Is this an expected behavior? I was thinking the decision to use
> a combiner depends on UDFs implementing the algebraic interface. Why is the
> constant projection stopping the combiner from being used?
>
> Thanks,
> Shubham.
>
> On Thu, Jun 16, 2011 at 2:38 PM, Daniel Dai <ji...@yahoo-inc.com> wrote:
>
>> Do you mean "d = group c by (var1, var2); "? If so, I can see the combiner
>> being used. Which version of Pig are you using?
>>
>> Daniel
>>
>>
>> On 06/16/2011 11:13 AM, Shubham Chopra wrote:
>>
>>> Hi,
>>>
>>> My pig query is roughly the following:
>>>
>>> register some_lib.jar
>>> a = load 'somefile' using CustomUDF();
>>> b = foreach a generate CustomProjectionUDF();
>>> c = foreach b generate var1, var2, var3;
>>> d = group b by (var1, var2);
>>> e = foreach d generate flatten(group), SUM(c.var1), SUM(c.var2),
>>> SUM(c.var3);
>>> store e into 'file';
>>>
>>> I was expecting to see the combiner being used, but the optimizer did not
>>> use a combiner. The following is the output I see (version 0.8.1)
>>> INFO executionengine.**HExecutionEngine: pig.usenewlogicalplan is set to
>>> true.
>>> New logical plan will be used.
>>> INFO executionengine.**HExecutionEngine: (Name: agg:
>>> Store(hdfs://machine:9000/**SomeFile:PigStorage('|')) - scope-4353
>>> Operator
>>> Key: scope-4353)
>>> INFO mapReduceLayer.MRCompiler: File concatenation threshold: 100
>>> optimistic? false
>>> INFO mapReduceLayer.**MultiQueryOptimizer: MR plan size before
>>> optimization: 1
>>> INFO mapReduceLayer.**MultiQueryOptimizer: MR plan size after
>>> optimization: 1
>>> INFO mapReduceLayer.**AccumulatorOptimizer: Reducer is to run in
>>> accumulative
>>> mode.
>>> INFO pigstats.ScriptState: Pig script settings are added to the job
>>> INFO mapReduceLayer.**JobControlCompiler: BytesPerReducer=1000000000
>>> maxReducers=999 totalInputFileSize=611579950
>>> INFO mapReduceLayer.**JobControlCompiler: Neither PARALLEL nor default
>>> parallelism is set for this job. Setting number of reducers to 1
>>> INFO mapReduceLayer.**MapReduceLauncher: 1 map-reduce job(s) waiting for
>>> submission.
>>>
>>> How can I enforce the use of combiner here?
>>>
>>> Thanks,
>>> Shubham.
>>>
>>
>>
>

Re: Accumulative reducer v/s Combiner usage

Posted by Shubham Chopra <sh...@gmail.com>.
Hi Daniel,

I am seeing this behaviour with 0.8.1.

Consider the an input file named a containing the following:
1|2|3
3||4

I start pig in the local mode and then use the following script:
a = load 'a' using PigStorage('|');
b = group a by $0;
c = foreach b generate 'Test' as name, flatten(group), SUM(a.$0) as s0,
SUM(a.$1) as s1, SUM(a.$2) as s2;
dump c;

The above script does not use the combiner.

However, the following script does:
a = load 'a' using PigStorage('|');
b = group a by $0;
c = foreach b generate flatten(group), SUM(a.$0) as s0, SUM(a.$1) as s1,
SUM(a.$2) as s2;
dump c;

This script uses the combiner.

I pinpointed the difference to using or not using a constant in the foreach
statement. Is this an expected behavior? I was thinking the decision to use
a combiner depends on UDFs implementing the algebraic interface. Why is the
constant projection stopping the combiner from being used?

Thanks,
Shubham.

On Thu, Jun 16, 2011 at 2:38 PM, Daniel Dai <ji...@yahoo-inc.com> wrote:

> Do you mean "d = group c by (var1, var2); "? If so, I can see the combiner
> being used. Which version of Pig are you using?
>
> Daniel
>
>
> On 06/16/2011 11:13 AM, Shubham Chopra wrote:
>
>> Hi,
>>
>> My pig query is roughly the following:
>>
>> register some_lib.jar
>> a = load 'somefile' using CustomUDF();
>> b = foreach a generate CustomProjectionUDF();
>> c = foreach b generate var1, var2, var3;
>> d = group b by (var1, var2);
>> e = foreach d generate flatten(group), SUM(c.var1), SUM(c.var2),
>> SUM(c.var3);
>> store e into 'file';
>>
>> I was expecting to see the combiner being used, but the optimizer did not
>> use a combiner. The following is the output I see (version 0.8.1)
>> INFO executionengine.**HExecutionEngine: pig.usenewlogicalplan is set to
>> true.
>> New logical plan will be used.
>> INFO executionengine.**HExecutionEngine: (Name: agg:
>> Store(hdfs://machine:9000/**SomeFile:PigStorage('|')) - scope-4353
>> Operator
>> Key: scope-4353)
>> INFO mapReduceLayer.MRCompiler: File concatenation threshold: 100
>> optimistic? false
>> INFO mapReduceLayer.**MultiQueryOptimizer: MR plan size before
>> optimization: 1
>> INFO mapReduceLayer.**MultiQueryOptimizer: MR plan size after
>> optimization: 1
>> INFO mapReduceLayer.**AccumulatorOptimizer: Reducer is to run in
>> accumulative
>> mode.
>> INFO pigstats.ScriptState: Pig script settings are added to the job
>> INFO mapReduceLayer.**JobControlCompiler: BytesPerReducer=1000000000
>> maxReducers=999 totalInputFileSize=611579950
>> INFO mapReduceLayer.**JobControlCompiler: Neither PARALLEL nor default
>> parallelism is set for this job. Setting number of reducers to 1
>> INFO mapReduceLayer.**MapReduceLauncher: 1 map-reduce job(s) waiting for
>> submission.
>>
>> How can I enforce the use of combiner here?
>>
>> Thanks,
>> Shubham.
>>
>
>

Re: Accumulative reducer v/s Combiner usage

Posted by Daniel Dai <ji...@yahoo-inc.com>.
Do you mean "d = group c by (var1, var2); "? If so, I can see the 
combiner being used. Which version of Pig are you using?

Daniel

On 06/16/2011 11:13 AM, Shubham Chopra wrote:
> Hi,
>
> My pig query is roughly the following:
>
> register some_lib.jar
> a = load 'somefile' using CustomUDF();
> b = foreach a generate CustomProjectionUDF();
> c = foreach b generate var1, var2, var3;
> d = group b by (var1, var2);
> e = foreach d generate flatten(group), SUM(c.var1), SUM(c.var2),
> SUM(c.var3);
> store e into 'file';
>
> I was expecting to see the combiner being used, but the optimizer did not
> use a combiner. The following is the output I see (version 0.8.1)
> INFO executionengine.HExecutionEngine: pig.usenewlogicalplan is set to true.
> New logical plan will be used.
> INFO executionengine.HExecutionEngine: (Name: agg:
> Store(hdfs://machine:9000/SomeFile:PigStorage('|')) - scope-4353 Operator
> Key: scope-4353)
> INFO mapReduceLayer.MRCompiler: File concatenation threshold: 100
> optimistic? false
> INFO mapReduceLayer.MultiQueryOptimizer: MR plan size before optimization: 1
> INFO mapReduceLayer.MultiQueryOptimizer: MR plan size after optimization: 1
> INFO mapReduceLayer.AccumulatorOptimizer: Reducer is to run in accumulative
> mode.
> INFO pigstats.ScriptState: Pig script settings are added to the job
> INFO mapReduceLayer.JobControlCompiler: BytesPerReducer=1000000000
> maxReducers=999 totalInputFileSize=611579950
> INFO mapReduceLayer.JobControlCompiler: Neither PARALLEL nor default
> parallelism is set for this job. Setting number of reducers to 1
> INFO mapReduceLayer.MapReduceLauncher: 1 map-reduce job(s) waiting for
> submission.
>
> How can I enforce the use of combiner here?
>
> Thanks,
> Shubham.