You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by byambajargal <by...@gmail.com> on 2011/04/17 15:03:05 UTC

How to improve the performs of PIG Join

Hello ...
I have a cluster with 11 nodes  each of them have 16 GB RAM, 6 core CPU, 
! TB HDD and i use cloudera distribution CHD4b with Pig. I have two Pig 
Join queries  which are a Parallel and a Replicated version of pig Join.

Theoretically Replicated Join could be faster than Parallel join but in 
my case Parallel is faster.
I am wondering why the replicated join is so slowly. i wont to improve 
the performance of both query. Could you check the detail of the queries.

thanks

Byambajargal


ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using 
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load 
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS 
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by 
concept_id,REL by concept_id*PARALLEL 10*;ISA_ANNO_T = GROUP ISA_ANNO 
ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump ISA_ANNO_C

HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      
Features
0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 10:31:36     
2011-04-15 10:43:22     
HASH_JOIN,GROU                                                       P_BY

Success!

Job Stats (time in seconds):
JobId                               Maps    Reduces           
MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   
MinReduceTime    AvgReduceTime    Alias              Feature Outputs
job_201103122121_0084   277     10                          15           
        5                           11                        417       
        351                           379     ANNO,ISA_ANNO,           
REL     HASH_JOIN
job_201103122121_0085   631     1                            10         
          5                            7                        242     
            242                          242     ISA_ANNO_C,ISA_ANNO_T 
GROUP_BY,COMBINER       
hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868,

Input(s):
Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
Successfully read 442049697 records from: 
"/datastorm/task3/obr_pm_annotation.txt"

Output(s):
Successfully stored 1 records (14 bytes) in: 
"hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868"

Counters:
Total records written : 1
Total bytes written : 14
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 41
Total records proactively spilled: 8781684

Job DAG:
job_201103122121_0084   ->      job_201103122121_0085,
job_201103122121_0085


2011-04-15 10:43:22,403 [main] INFO  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR                                                       
educeLauncher - Success!
2011-04-15 10:43:22,419 [main] INFO  
org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total 
inp                                                       ut paths to 
process : 1
2011-04-15 10:43:22,419 [main] INFO  
org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - 
T                                                       otal input paths 
to process : 1
(844872046)


*Using replicated version*
*ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using 
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load 
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS 
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by 
concept_id,REL by concept_idUSING 'replicated';ISA_ANNO_T = GROUP 
ISA_ANNO ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump 
ISA_ANNO_C*
**
HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      
Features
0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 10:57:37     
2011-04-15 11:26:32     
REPLICATED_JOI                                                       
N,GROUP_BY

Success!

Job Stats (time in seconds):
JobId                               Maps    Reduces MaxMapTime      
MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime 
  AvgReduceTime    Alias   Feature Outputs
job_201103122121_0088   11        0               11                 5   
                      9                       0                       0 
                       0       REL     MAP_ON      LY
job_201103122121_0089   266      1               151              101   
                  123                    1566                   1566     
               1566    ANNO,ISA_ANNO,ISA_ANNO_C,ISA_ANNO_T   
REPLICATED_JOIN,GROUP_BY,COMBINER       
hdfs://haisen11:54310/tmp/temp-1729753                                                       
626/tmp-61569771,

Input(s):
Successfully read 442049697 records (17809735666 bytes) from: 
"/datastorm/task3/obr_pm_annotation.txt"
Successfully read 24153638 records (691022731 bytes) from: 
"/datastorm/task3/obs_relation.txt"

Output(s):
Successfully stored 1 records (14 bytes) in: 
"hdfs://haisen11:54310/tmp/temp-1729753626/tmp-61569771"

Counters:
Total records written : 1
Total bytes written : 14
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_201103122121_0088   ->      job_201103122121_0089,
job_201103122121_0089


2011-04-15 11:26:32,751 [main] INFO  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR                                                       
educeLauncher - Success!
2011-04-15 11:26:32,889 [main] INFO  
org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total 
inp                                                       ut paths to 
process : 1
2011-04-15 11:26:32,899 [main] INFO  
org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - 
T                                                       otal input paths 
to process : 1
(844872046)

*
*
* ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using 
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load 
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS 
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by 
concept_id,REL by concept_id PARALLEL 10;store ISA_ANNO into 'outputdel';
*

HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      
Features
0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 16:08:52     
2011-04-15 16:16:26     HASH_JOIN

Success!

Job Stats (time in seconds):
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      
MaxReduceTime   MinReduceTime   
AvgReduc                                             eTime   Alias   
Feature Outputs
job_201103122121_0090   277     10      15      6       11      432     
353     394     ANNO,ISA_ANNO,REL      
H                                             ASH_JOIN        
hdfs://haisen11:54310/user/haisen/outputdel,

Input(s):
Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
Successfully read 442049697 records from: 
"/datastorm/task3/obr_pm_annotation.txt"

Output(s):
Successfully stored 844872046 records (34500196186 bytes) in: 
"hdfs://haisen11:54310/user/haisen/outputdel"

Counters:
Total records written : 844872046
Total bytes written : 34500196186
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 41
Total records proactively spilled: 8537764

Job DAG:
job_201103122121_0090

2011-04-15 16:16:26,320 [main] INFO  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLaunc                                             
her - Success!


* ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using 
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load 
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS 
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by 
concept_id,REL by concept_id USING 'replicated';store ISA_ANNO into 
'outputdel';*


HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      
Features
0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 16:32:20     
2011-04-15 17:02:16     REPLICATED_JOIN

Success!

Job Stats (time in seconds):
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      
MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201103122121_0093   11      0       10      5       9       0       
0       0       REL     MAP_ONLY
job_201103122121_0094   266     0       156     96      128     0       
0       0       ANNO,ISA_ANNO   REPLICATED_JOIN,MAP_ONLY        
hdfs://haisen11:54310/user/haisen/outputdel1,

Input(s):
Successfully read 24153638 records (691022731 bytes) from: 
"/datastorm/task3/obs_relation.txt"
Successfully read 442049697 records (17809735666 bytes) from: 
"/datastorm/task3/obr_pm_annotation.txt"

Output(s):
Successfully stored 844872046 records (34500196186 bytes) in: 
"hdfs://haisen11:54310/user/haisen/outputdel1"

Counters:
Total records written : 844872046
Total bytes written : 34500196186
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_201103122121_0093   ->      job_201103122121_0094,
job_201103122121_0094


2011-04-15 17:02:16,651 [main] INFO  
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher 
- Success!








Re: What is implemented behind the PIG Joins

Posted by byambajav byambajargal <by...@gmail.com>.
Pig 0.8.1.

On Mon, Aug 22, 2011 at 10:58 PM, Thejas Nair <th...@hortonworks.com>wrote:

> Hi Byambajargal,
> What version of pig does your distribution use ?
> -Thejas
>
>
> On 8/22/11 3:42 AM, byambaa wrote:
>
>> Hello
>> I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
>> 1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two
>> Pig
>> Join queries which are a Parallel and a Replicated version of pig Join
>> and MapReduce Reduce side and Map side joins.
>>
>> Theoretically Replicated Join could be faster than Parallel join but in
>> my case Parallel is faster.
>> i have a questions :
>>
>> 1.I am wondering why the replicated join is so slowly how it works what
>> is the behind the replicated join.
>> 2. MR reduce side join was faster than parallel pig join, what is
>> implemented background the parallel pig join. i guess pig implement also
>> MR reduce side join.
>>
>> Could you explain me about the Pig joins how it works and what is run
>> behind the pig scripts
>>
>>
>> Replicated Join in HDFS Replicated Join in Hbase MR Reduce side join MR
>> Joins (Singleton pattern)
>> obr_wp_annotation 1786MB
>> 29 sec 50 sec 36 sec 19
>> obr_ct_annotation 5916MB
>> 799 sec 523 sec
>> 108 sec 69
>> obr_pm_annotation 16983MB
>> 1794 sec
>> 707 sec 248 sec 138
>>
>> the relation file is 659MB
>>
>> thanks you very much
>>
>> Byambajargal
>>
>>
>>
>

Re: What is implemented behind the PIG Joins

Posted by Thejas Nair <th...@hortonworks.com>.
Hi Byambajargal,
What version of pig does your distribution use ?
-Thejas

On 8/22/11 3:42 AM, byambaa wrote:
> Hello
> I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
> 1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two
> Pig
> Join queries which are a Parallel and a Replicated version of pig Join
> and MapReduce Reduce side and Map side joins.
>
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> i have a questions :
>
> 1.I am wondering why the replicated join is so slowly how it works what
> is the behind the replicated join.
> 2. MR reduce side join was faster than parallel pig join, what is
> implemented background the parallel pig join. i guess pig implement also
> MR reduce side join.
>
> Could you explain me about the Pig joins how it works and what is run
> behind the pig scripts
>
>
> Replicated Join in HDFS Replicated Join in Hbase MR Reduce side join MR
> Joins (Singleton pattern)
> obr_wp_annotation 1786MB
> 29 sec 50 sec 36 sec 19
> obr_ct_annotation 5916MB
> 799 sec 523 sec
> 108 sec 69
> obr_pm_annotation 16983MB
> 1794 sec
> 707 sec 248 sec 138
>
> the relation file is 659MB
>
> thanks you very much
>
> Byambajargal
>
>


Re: What is implemented behind the PIG Joins

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Let's say you are joining tables A, B, and C (listed in that order). The
default join just does a regular Hadoop MR join: read in all relations, tag
each row with source relation, emit with the key being the join key, collect
on the reducers.

Replicated join is intended for small relations that fit in memory of a
single map task. They work as follows: put all but the leftmost relation
into the distributed cache; read relation A in the mappers; in each mapper,
during initialization, load B and C from dist cache into memory; stream
through the chunk of A allocated to each mapper, and join it with the
in-memory B and C.

If B and C are bigger than your available memory, this clearly doesn't work
very well and you need to do a regular join.

D

On Mon, Aug 22, 2011 at 3:42 AM, byambaa <by...@gmail.com> wrote:

> Hello
> I have a cluster with 11 nodes  each of them have 16 GB RAM, 6 core CPU,
> 1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two
> Pig
> Join queries  which are a Parallel and a Replicated version of pig Join and
> MapReduce Reduce side  and Map side joins.
>
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> i have a questions :
>
> 1.I am wondering why the replicated join is so slowly how it works what is
> the behind the replicated join.
> 2. MR reduce side join was faster than parallel pig join, what is
> implemented background the parallel pig join. i guess pig implement also MR
> reduce side join.
>
> Could you explain me about the Pig joins how it works and what is run
> behind the pig scripts
>
>
>        Replicated Join in HDFS Replicated Join in Hbase        MR Reduce
> side join MR Joins (Singleton pattern)
> obr_wp_annotation 1786MB
>        29 sec  50 sec  36 sec  19
> obr_ct_annotation 5916MB
>        799 sec         523 sec
>        108 sec         69
> obr_pm_annotation 16983MB
>        1794 sec
>        707 sec         248 sec         138
>
> the relation file is 659MB
>
>  thanks you very much
>
> Byambajargal
>
>

What is implemented behind the PIG Joins

Posted by byambaa <by...@gmail.com>.
Hello
I have a cluster with 11 nodes  each of them have 16 GB RAM, 6 core CPU,
1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two Pig
Join queries  which are a Parallel and a Replicated version of pig Join and MapReduce Reduce side  and Map side joins.

Theoretically Replicated Join could be faster than Parallel join but in
my case Parallel is faster.
i have a questions :

1.I am wondering why the replicated join is so slowly how it works what is the behind the replicated join.
2. MR reduce side join was faster than parallel pig join, what is implemented background the parallel pig join. i guess pig implement also MR reduce side join.

Could you explain me about the Pig joins how it works and what is run behind the pig scripts


	Replicated Join in HDFS	Replicated Join in Hbase 	MR Reduce side join 
MR Joins (Singleton pattern)
obr_wp_annotation 1786MB
	29 sec 	50 sec 	36 sec 	19
obr_ct_annotation 5916MB
	799 sec 	523 sec
	108 sec 	69
obr_pm_annotation 16983MB
	1794 sec
	707 sec 	248 sec 	138

the relation file is 659MB

  thanks you very much

Byambajargal


Re: How to improve the performs of PIG Join

Posted by Thejas M Nair <te...@yahoo-inc.com>.
Here is the (theoretical) rule of thumb for replicated join :
for replicated join to perform significantly better than default join, the size of the replicated input should be  smaller than the block size ( or pig.maxCombinedSplitSize if property pig.splitCombination=true and larger than block size).

This is because for the number of map tasks started are equal to the number of blocks (or size/pig.maxCombinedSplitSize) in the left side input of replicated join. Each of these blocks will read   the replicated input. If the replicated input read size is few times larger than block size, using replicated join will not save on IO/(de)serialization costs.

-Thejas



On 4/18/11 4:33 PM, "Thejas M Nair" <te...@yahoo-inc.com> wrote:

For default join (hash join) -
- Increasing the parallelism of the default join should speed it up.
- Put the table which has large number of tuples per key as the last table
in join . (Yes, this happens to be the opposite of the recommendation for
replicated join !) See -
http://pig.apache.org/docs/r0.8.0/cookbook.html#Take+Advantage+of+Join+Optim
izations
- http://pig.apache.org/docs/r0.8.0/cookbook.html#Project+Early+and+Often

For replicated join -
- I believe the reason why replicated join is performing worse that default
join is because of the large number of maps and the large size of the
replicated file. Each map task ends up reading and deserializing the
replicated file( obs_relation.txt), and usually that takes bulk of the
runtime. In this case (691MB x 266 (maps) =~) 183GB of replicated input data
will be read and deserialized by all the map tasks. This is actually very
small compared to size of the larger input (17GB).
To reduce the number of maps, you can use the feature introduced in
https://issues.apache.org/jira/browse/PIG-1518 , ensure that you have the
property pig.splitCombination=true, and pig.maxCombinedSplitSize=X, where X
= size_of_obr_pm_annotation.txt/number-of-map-slots . This will ensure that
all cluster slots are used and you don't have too many map tasks.

-Thejas



On 4/17/11 6:03 AM, "byambajargal" <by...@gmail.com> wrote:

> Hello ...
> I have a cluster with 11 nodes  each of them have 16 GB RAM, 6 core CPU,
> ! TB HDD and i use cloudera distribution CHD4b with Pig. I have two Pig
> Join queries  which are a Parallel and a Replicated version of pig Join.
>
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> I am wondering why the replicated join is so slowly. i wont to improve
> the performance of both query. Could you check the detail of the queries.
>
> thanks
>
> Byambajargal
>
>
> ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id*PARALLEL 10*;ISA_ANNO_T = GROUP ISA_ANNO
> ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump ISA_ANNO_C
>
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 10:31:36
> 2011-04-15 10:43:22
> HASH_JOIN,GROU                                                       P_BY
>
> Success!
>
> Job Stats (time in seconds):
> JobId                               Maps    Reduces
> MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime
> MinReduceTime    AvgReduceTime    Alias              Feature Outputs
> job_201103122121_0084   277     10                          15
>         5                           11                        417
>         351                           379     ANNO,ISA_ANNO,
> REL     HASH_JOIN
> job_201103122121_0085   631     1                            10
>           5                            7                        242
>             242                          242     ISA_ANNO_C,ISA_ANNO_T
> GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868,
>
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868"
>
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8781684
>
> Job DAG:
> job_201103122121_0084   ->      job_201103122121_0085,
> job_201103122121_0085
>
>
> 2011-04-15 10:43:22,403 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp                                                       ut paths to
> process : 1
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T                                                       otal input paths
> to process : 1
> (844872046)
>
>
> *Using replicated version*
> *ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_idUSING 'replicated';ISA_ANNO_T = GROUP
> ISA_ANNO ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump
> ISA_ANNO_C*
> **
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 10:57:37
> 2011-04-15 11:26:32
> REPLICATED_JOI
> N,GROUP_BY
>
> Success!
>
> Job Stats (time in seconds):
> JobId                               Maps    Reduces MaxMapTime
> MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime
>   AvgReduceTime    Alias   Feature Outputs
> job_201103122121_0088   11        0               11                 5
>                       9                       0                       0
>                        0       REL     MAP_ON      LY
> job_201103122121_0089   266      1               151              101
>                   123                    1566                   1566
>                1566    ANNO,ISA_ANNO,ISA_ANNO_C,ISA_ANNO_T
> REPLICATED_JOIN,GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp-1729753
> 626/tmp-61569771,
>
> Input(s):
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
>
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp-1729753626/tmp-61569771"
>
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
>
> Job DAG:
> job_201103122121_0088   ->      job_201103122121_0089,
> job_201103122121_0089
>
>
> 2011-04-15 11:26:32,751 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 11:26:32,889 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp                                                       ut paths to
> process : 1
> 2011-04-15 11:26:32,899 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T                                                       otal input paths
> to process : 1
> (844872046)
>
> *
> *
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id PARALLEL 10;store ISA_ANNO into 'outputdel';
> *
>
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 16:08:52
> 2011-04-15 16:16:26     HASH_JOIN
>
> Success!
>
> Job Stats (time in seconds):
> JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime
> MaxReduceTime   MinReduceTime
> AvgReduc                                             eTime   Alias
> Feature Outputs
> job_201103122121_0090   277     10      15      6       11      432
> 353     394     ANNO,ISA_ANNO,REL
> H                                             ASH_JOIN
> hdfs://haisen11:54310/user/haisen/outputdel,
>
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel"
>
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8537764
>
> Job DAG:
> job_201103122121_0090
>
> 2011-04-15 16:16:26,320 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLaunc
> her - Success!
>
>
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id USING 'replicated';store ISA_ANNO into
> 'outputdel';*
>
>
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 16:32:20
> 2011-04-15 17:02:16     REPLICATED_JOIN
>
> Success!
>
> Job Stats (time in seconds):
> JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime
> MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
> job_201103122121_0093   11      0       10      5       9       0
> 0       0       REL     MAP_ONLY
> job_201103122121_0094   266     0       156     96      128     0
> 0       0       ANNO,ISA_ANNO   REPLICATED_JOIN,MAP_ONLY
> hdfs://haisen11:54310/user/haisen/outputdel1,
>
> Input(s):
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel1"
>
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
>
> Job DAG:
> job_201103122121_0093   ->      job_201103122121_0094,
> job_201103122121_0094
>
>
> 2011-04-15 17:02:16,651 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher
> - Success!
>
>
>
>
>
>
>
>


--





--


Re: How to improve the performs of PIG Join

Posted by Thejas M Nair <te...@yahoo-inc.com>.
For default join (hash join) -
- Increasing the parallelism of the default join should speed it up.
- Put the table which has large number of tuples per key as the last table
in join . (Yes, this happens to be the opposite of the recommendation for
replicated join !) See -
http://pig.apache.org/docs/r0.8.0/cookbook.html#Take+Advantage+of+Join+Optim
izations
- http://pig.apache.org/docs/r0.8.0/cookbook.html#Project+Early+and+Often

For replicated join -
- I believe the reason why replicated join is performing worse that default
join is because of the large number of maps and the large size of the
replicated file. Each map task ends up reading and deserializing the
replicated file( obs_relation.txt), and usually that takes bulk of the
runtime. In this case (691MB x 266 (maps) =~) 183GB of replicated input data
will be read and deserialized by all the map tasks. This is actually very
small compared to size of the larger input (17GB).
To reduce the number of maps, you can use the feature introduced in
https://issues.apache.org/jira/browse/PIG-1518 , ensure that you have the
property pig.splitCombination=true, and pig.maxCombinedSplitSize=X, where X
= size_of_obr_pm_annotation.txt/number-of-map-slots . This will ensure that
all cluster slots are used and you don't have too many map tasks.

-Thejas



On 4/17/11 6:03 AM, "byambajargal" <by...@gmail.com> wrote:

> Hello ...
> I have a cluster with 11 nodes  each of them have 16 GB RAM, 6 core CPU,
> ! TB HDD and i use cloudera distribution CHD4b with Pig. I have two Pig
> Join queries  which are a Parallel and a Replicated version of pig Join.
> 
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> I am wondering why the replicated join is so slowly. i wont to improve
> the performance of both query. Could you check the detail of the queries.
> 
> thanks
> 
> Byambajargal
> 
> 
> ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id*PARALLEL 10*;ISA_ANNO_T = GROUP ISA_ANNO
> ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump ISA_ANNO_C
> 
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 10:31:36
> 2011-04-15 10:43:22
> HASH_JOIN,GROU                                                       P_BY
> 
> Success!
> 
> Job Stats (time in seconds):
> JobId                               Maps    Reduces
> MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime
> MinReduceTime    AvgReduceTime    Alias              Feature Outputs
> job_201103122121_0084   277     10                          15
>         5                           11                        417
>         351                           379     ANNO,ISA_ANNO,
> REL     HASH_JOIN
> job_201103122121_0085   631     1                            10
>           5                            7                        242
>             242                          242     ISA_ANNO_C,ISA_ANNO_T
> GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868,
> 
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
> 
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868"
> 
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8781684
> 
> Job DAG:
> job_201103122121_0084   ->      job_201103122121_0085,
> job_201103122121_0085
> 
> 
> 2011-04-15 10:43:22,403 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp                                                       ut paths to
> process : 1
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T                                                       otal input paths
> to process : 1
> (844872046)
> 
> 
> *Using replicated version*
> *ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_idUSING 'replicated';ISA_ANNO_T = GROUP
> ISA_ANNO ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump
> ISA_ANNO_C*
> **
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 10:57:37
> 2011-04-15 11:26:32
> REPLICATED_JOI   
> N,GROUP_BY
> 
> Success!
> 
> Job Stats (time in seconds):
> JobId                               Maps    Reduces MaxMapTime
> MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime
>   AvgReduceTime    Alias   Feature Outputs
> job_201103122121_0088   11        0               11                 5
>                       9                       0                       0
>                        0       REL     MAP_ON      LY
> job_201103122121_0089   266      1               151              101
>                   123                    1566                   1566
>                1566    ANNO,ISA_ANNO,ISA_ANNO_C,ISA_ANNO_T
> REPLICATED_JOIN,GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp-1729753
> 626/tmp-61569771,
> 
> Input(s):
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
> 
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp-1729753626/tmp-61569771"
> 
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
> 
> Job DAG:
> job_201103122121_0088   ->      job_201103122121_0089,
> job_201103122121_0089
> 
> 
> 2011-04-15 11:26:32,751 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 11:26:32,889 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp                                                       ut paths to
> process : 1
> 2011-04-15 11:26:32,899 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T                                                       otal input paths
> to process : 1
> (844872046)
> 
> *
> *
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id PARALLEL 10;store ISA_ANNO into 'outputdel';
> *
> 
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 16:08:52
> 2011-04-15 16:16:26     HASH_JOIN
> 
> Success!
> 
> Job Stats (time in seconds):
> JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime
> MaxReduceTime   MinReduceTime
> AvgReduc                                             eTime   Alias
> Feature Outputs
> job_201103122121_0090   277     10      15      6       11      432
> 353     394     ANNO,ISA_ANNO,REL
> H                                             ASH_JOIN
> hdfs://haisen11:54310/user/haisen/outputdel,
> 
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
> 
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel"
> 
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8537764
> 
> Job DAG:
> job_201103122121_0090
> 
> 2011-04-15 16:16:26,320 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLaunc
> her - Success!
> 
> 
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id USING 'replicated';store ISA_ANNO into
> 'outputdel';*
> 
> 
> HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt
> Features
> 0.20.2-CDH3B4   0.8.0-CDH3B4    haisen  2011-04-15 16:32:20
> 2011-04-15 17:02:16     REPLICATED_JOIN
> 
> Success!
> 
> Job Stats (time in seconds):
> JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime
> MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
> job_201103122121_0093   11      0       10      5       9       0
> 0       0       REL     MAP_ONLY
> job_201103122121_0094   266     0       156     96      128     0
> 0       0       ANNO,ISA_ANNO   REPLICATED_JOIN,MAP_ONLY
> hdfs://haisen11:54310/user/haisen/outputdel1,
> 
> Input(s):
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
> 
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel1"
> 
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
> 
> Job DAG:
> job_201103122121_0093   ->      job_201103122121_0094,
> job_201103122121_0094
> 
> 
> 2011-04-15 17:02:16,651 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher
> - Success!
> 
> 
> 
> 
> 
> 
> 
> 


--