You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by byambajargal <by...@gmail.com> on 2011/04/17 15:03:05 UTC
How to improve the performs of PIG Join
Hello ...
I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
! TB HDD and i use cloudera distribution CHD4b with Pig. I have two Pig
Join queries which are a Parallel and a Replicated version of pig Join.
Theoretically Replicated Join could be faster than Parallel join but in
my case Parallel is faster.
I am wondering why the replicated join is so slowly. i wont to improve
the performance of both query. Could you check the detail of the queries.
thanks
Byambajargal
ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
concept_id,REL by concept_id*PARALLEL 10*;ISA_ANNO_T = GROUP ISA_ANNO
ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump ISA_ANNO_C
HadoopVersion PigVersion UserId StartedAt FinishedAt
Features
0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:31:36
2011-04-15 10:43:22
HASH_JOIN,GROU P_BY
Success!
Job Stats (time in seconds):
JobId Maps Reduces
MaxMapTime MinMapTIme AvgMapTime MaxReduceTime
MinReduceTime AvgReduceTime Alias Feature Outputs
job_201103122121_0084 277 10 15
5 11 417
351 379 ANNO,ISA_ANNO,
REL HASH_JOIN
job_201103122121_0085 631 1 10
5 7 242
242 242 ISA_ANNO_C,ISA_ANNO_T
GROUP_BY,COMBINER
hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868,
Input(s):
Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
Successfully read 442049697 records from:
"/datastorm/task3/obr_pm_annotation.txt"
Output(s):
Successfully stored 1 records (14 bytes) in:
"hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868"
Counters:
Total records written : 1
Total bytes written : 14
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 41
Total records proactively spilled: 8781684
Job DAG:
job_201103122121_0084 -> job_201103122121_0085,
job_201103122121_0085
2011-04-15 10:43:22,403 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
educeLauncher - Success!
2011-04-15 10:43:22,419 [main] INFO
org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
inp ut paths to
process : 1
2011-04-15 10:43:22,419 [main] INFO
org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
T otal input paths
to process : 1
(844872046)
*Using replicated version*
*ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
concept_id,REL by concept_idUSING 'replicated';ISA_ANNO_T = GROUP
ISA_ANNO ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump
ISA_ANNO_C*
**
HadoopVersion PigVersion UserId StartedAt FinishedAt
Features
0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:57:37
2011-04-15 11:26:32
REPLICATED_JOI
N,GROUP_BY
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime
MinMapTIme AvgMapTime MaxReduceTime MinReduceTime
AvgReduceTime Alias Feature Outputs
job_201103122121_0088 11 0 11 5
9 0 0
0 REL MAP_ON LY
job_201103122121_0089 266 1 151 101
123 1566 1566
1566 ANNO,ISA_ANNO,ISA_ANNO_C,ISA_ANNO_T
REPLICATED_JOIN,GROUP_BY,COMBINER
hdfs://haisen11:54310/tmp/temp-1729753
626/tmp-61569771,
Input(s):
Successfully read 442049697 records (17809735666 bytes) from:
"/datastorm/task3/obr_pm_annotation.txt"
Successfully read 24153638 records (691022731 bytes) from:
"/datastorm/task3/obs_relation.txt"
Output(s):
Successfully stored 1 records (14 bytes) in:
"hdfs://haisen11:54310/tmp/temp-1729753626/tmp-61569771"
Counters:
Total records written : 1
Total bytes written : 14
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_201103122121_0088 -> job_201103122121_0089,
job_201103122121_0089
2011-04-15 11:26:32,751 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
educeLauncher - Success!
2011-04-15 11:26:32,889 [main] INFO
org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
inp ut paths to
process : 1
2011-04-15 11:26:32,899 [main] INFO
org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
T otal input paths
to process : 1
(844872046)
*
*
* ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
concept_id,REL by concept_id PARALLEL 10;store ISA_ANNO into 'outputdel';
*
HadoopVersion PigVersion UserId StartedAt FinishedAt
Features
0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:08:52
2011-04-15 16:16:26 HASH_JOIN
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
MaxReduceTime MinReduceTime
AvgReduc eTime Alias
Feature Outputs
job_201103122121_0090 277 10 15 6 11 432
353 394 ANNO,ISA_ANNO,REL
H ASH_JOIN
hdfs://haisen11:54310/user/haisen/outputdel,
Input(s):
Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
Successfully read 442049697 records from:
"/datastorm/task3/obr_pm_annotation.txt"
Output(s):
Successfully stored 844872046 records (34500196186 bytes) in:
"hdfs://haisen11:54310/user/haisen/outputdel"
Counters:
Total records written : 844872046
Total bytes written : 34500196186
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 41
Total records proactively spilled: 8537764
Job DAG:
job_201103122121_0090
2011-04-15 16:16:26,320 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLaunc
her - Success!
* ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
'/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
(id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
concept_id,REL by concept_id USING 'replicated';store ISA_ANNO into
'outputdel';*
HadoopVersion PigVersion UserId StartedAt FinishedAt
Features
0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:32:20
2011-04-15 17:02:16 REPLICATED_JOIN
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_201103122121_0093 11 0 10 5 9 0
0 0 REL MAP_ONLY
job_201103122121_0094 266 0 156 96 128 0
0 0 ANNO,ISA_ANNO REPLICATED_JOIN,MAP_ONLY
hdfs://haisen11:54310/user/haisen/outputdel1,
Input(s):
Successfully read 24153638 records (691022731 bytes) from:
"/datastorm/task3/obs_relation.txt"
Successfully read 442049697 records (17809735666 bytes) from:
"/datastorm/task3/obr_pm_annotation.txt"
Output(s):
Successfully stored 844872046 records (34500196186 bytes) in:
"hdfs://haisen11:54310/user/haisen/outputdel1"
Counters:
Total records written : 844872046
Total bytes written : 34500196186
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_201103122121_0093 -> job_201103122121_0094,
job_201103122121_0094
2011-04-15 17:02:16,651 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher
- Success!
Re: What is implemented behind the PIG Joins
Posted by byambajav byambajargal <by...@gmail.com>.
Pig 0.8.1.
On Mon, Aug 22, 2011 at 10:58 PM, Thejas Nair <th...@hortonworks.com>wrote:
> Hi Byambajargal,
> What version of pig does your distribution use ?
> -Thejas
>
>
> On 8/22/11 3:42 AM, byambaa wrote:
>
>> Hello
>> I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
>> 1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two
>> Pig
>> Join queries which are a Parallel and a Replicated version of pig Join
>> and MapReduce Reduce side and Map side joins.
>>
>> Theoretically Replicated Join could be faster than Parallel join but in
>> my case Parallel is faster.
>> i have a questions :
>>
>> 1.I am wondering why the replicated join is so slowly how it works what
>> is the behind the replicated join.
>> 2. MR reduce side join was faster than parallel pig join, what is
>> implemented background the parallel pig join. i guess pig implement also
>> MR reduce side join.
>>
>> Could you explain me about the Pig joins how it works and what is run
>> behind the pig scripts
>>
>>
>> Replicated Join in HDFS Replicated Join in Hbase MR Reduce side join MR
>> Joins (Singleton pattern)
>> obr_wp_annotation 1786MB
>> 29 sec 50 sec 36 sec 19
>> obr_ct_annotation 5916MB
>> 799 sec 523 sec
>> 108 sec 69
>> obr_pm_annotation 16983MB
>> 1794 sec
>> 707 sec 248 sec 138
>>
>> the relation file is 659MB
>>
>> thanks you very much
>>
>> Byambajargal
>>
>>
>>
>
Re: What is implemented behind the PIG Joins
Posted by Thejas Nair <th...@hortonworks.com>.
Hi Byambajargal,
What version of pig does your distribution use ?
-Thejas
On 8/22/11 3:42 AM, byambaa wrote:
> Hello
> I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
> 1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two
> Pig
> Join queries which are a Parallel and a Replicated version of pig Join
> and MapReduce Reduce side and Map side joins.
>
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> i have a questions :
>
> 1.I am wondering why the replicated join is so slowly how it works what
> is the behind the replicated join.
> 2. MR reduce side join was faster than parallel pig join, what is
> implemented background the parallel pig join. i guess pig implement also
> MR reduce side join.
>
> Could you explain me about the Pig joins how it works and what is run
> behind the pig scripts
>
>
> Replicated Join in HDFS Replicated Join in Hbase MR Reduce side join MR
> Joins (Singleton pattern)
> obr_wp_annotation 1786MB
> 29 sec 50 sec 36 sec 19
> obr_ct_annotation 5916MB
> 799 sec 523 sec
> 108 sec 69
> obr_pm_annotation 16983MB
> 1794 sec
> 707 sec 248 sec 138
>
> the relation file is 659MB
>
> thanks you very much
>
> Byambajargal
>
>
Re: What is implemented behind the PIG Joins
Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Let's say you are joining tables A, B, and C (listed in that order). The
default join just does a regular Hadoop MR join: read in all relations, tag
each row with source relation, emit with the key being the join key, collect
on the reducers.
Replicated join is intended for small relations that fit in memory of a
single map task. They work as follows: put all but the leftmost relation
into the distributed cache; read relation A in the mappers; in each mapper,
during initialization, load B and C from dist cache into memory; stream
through the chunk of A allocated to each mapper, and join it with the
in-memory B and C.
If B and C are bigger than your available memory, this clearly doesn't work
very well and you need to do a regular join.
D
On Mon, Aug 22, 2011 at 3:42 AM, byambaa <by...@gmail.com> wrote:
> Hello
> I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
> 1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two
> Pig
> Join queries which are a Parallel and a Replicated version of pig Join and
> MapReduce Reduce side and Map side joins.
>
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> i have a questions :
>
> 1.I am wondering why the replicated join is so slowly how it works what is
> the behind the replicated join.
> 2. MR reduce side join was faster than parallel pig join, what is
> implemented background the parallel pig join. i guess pig implement also MR
> reduce side join.
>
> Could you explain me about the Pig joins how it works and what is run
> behind the pig scripts
>
>
> Replicated Join in HDFS Replicated Join in Hbase MR Reduce
> side join MR Joins (Singleton pattern)
> obr_wp_annotation 1786MB
> 29 sec 50 sec 36 sec 19
> obr_ct_annotation 5916MB
> 799 sec 523 sec
> 108 sec 69
> obr_pm_annotation 16983MB
> 1794 sec
> 707 sec 248 sec 138
>
> the relation file is 659MB
>
> thanks you very much
>
> Byambajargal
>
>
What is implemented behind the PIG Joins
Posted by byambaa <by...@gmail.com>.
Hello
I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
1 TB HDD and i am using cloudera distribution CHD4b with Pig. I have two Pig
Join queries which are a Parallel and a Replicated version of pig Join and MapReduce Reduce side and Map side joins.
Theoretically Replicated Join could be faster than Parallel join but in
my case Parallel is faster.
i have a questions :
1.I am wondering why the replicated join is so slowly how it works what is the behind the replicated join.
2. MR reduce side join was faster than parallel pig join, what is implemented background the parallel pig join. i guess pig implement also MR reduce side join.
Could you explain me about the Pig joins how it works and what is run behind the pig scripts
Replicated Join in HDFS Replicated Join in Hbase MR Reduce side join
MR Joins (Singleton pattern)
obr_wp_annotation 1786MB
29 sec 50 sec 36 sec 19
obr_ct_annotation 5916MB
799 sec 523 sec
108 sec 69
obr_pm_annotation 16983MB
1794 sec
707 sec 248 sec 138
the relation file is 659MB
thanks you very much
Byambajargal
Re: How to improve the performs of PIG Join
Posted by Thejas M Nair <te...@yahoo-inc.com>.
Here is the (theoretical) rule of thumb for replicated join :
for replicated join to perform significantly better than default join, the size of the replicated input should be smaller than the block size ( or pig.maxCombinedSplitSize if property pig.splitCombination=true and larger than block size).
This is because for the number of map tasks started are equal to the number of blocks (or size/pig.maxCombinedSplitSize) in the left side input of replicated join. Each of these blocks will read the replicated input. If the replicated input read size is few times larger than block size, using replicated join will not save on IO/(de)serialization costs.
-Thejas
On 4/18/11 4:33 PM, "Thejas M Nair" <te...@yahoo-inc.com> wrote:
For default join (hash join) -
- Increasing the parallelism of the default join should speed it up.
- Put the table which has large number of tuples per key as the last table
in join . (Yes, this happens to be the opposite of the recommendation for
replicated join !) See -
http://pig.apache.org/docs/r0.8.0/cookbook.html#Take+Advantage+of+Join+Optim
izations
- http://pig.apache.org/docs/r0.8.0/cookbook.html#Project+Early+and+Often
For replicated join -
- I believe the reason why replicated join is performing worse that default
join is because of the large number of maps and the large size of the
replicated file. Each map task ends up reading and deserializing the
replicated file( obs_relation.txt), and usually that takes bulk of the
runtime. In this case (691MB x 266 (maps) =~) 183GB of replicated input data
will be read and deserialized by all the map tasks. This is actually very
small compared to size of the larger input (17GB).
To reduce the number of maps, you can use the feature introduced in
https://issues.apache.org/jira/browse/PIG-1518 , ensure that you have the
property pig.splitCombination=true, and pig.maxCombinedSplitSize=X, where X
= size_of_obr_pm_annotation.txt/number-of-map-slots . This will ensure that
all cluster slots are used and you don't have too many map tasks.
-Thejas
On 4/17/11 6:03 AM, "byambajargal" <by...@gmail.com> wrote:
> Hello ...
> I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
> ! TB HDD and i use cloudera distribution CHD4b with Pig. I have two Pig
> Join queries which are a Parallel and a Replicated version of pig Join.
>
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> I am wondering why the replicated join is so slowly. i wont to improve
> the performance of both query. Could you check the detail of the queries.
>
> thanks
>
> Byambajargal
>
>
> ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id*PARALLEL 10*;ISA_ANNO_T = GROUP ISA_ANNO
> ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump ISA_ANNO_C
>
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:31:36
> 2011-04-15 10:43:22
> HASH_JOIN,GROU P_BY
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces
> MaxMapTime MinMapTIme AvgMapTime MaxReduceTime
> MinReduceTime AvgReduceTime Alias Feature Outputs
> job_201103122121_0084 277 10 15
> 5 11 417
> 351 379 ANNO,ISA_ANNO,
> REL HASH_JOIN
> job_201103122121_0085 631 1 10
> 5 7 242
> 242 242 ISA_ANNO_C,ISA_ANNO_T
> GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868,
>
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868"
>
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8781684
>
> Job DAG:
> job_201103122121_0084 -> job_201103122121_0085,
> job_201103122121_0085
>
>
> 2011-04-15 10:43:22,403 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp ut paths to
> process : 1
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T otal input paths
> to process : 1
> (844872046)
>
>
> *Using replicated version*
> *ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_idUSING 'replicated';ISA_ANNO_T = GROUP
> ISA_ANNO ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump
> ISA_ANNO_C*
> **
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:57:37
> 2011-04-15 11:26:32
> REPLICATED_JOI
> N,GROUP_BY
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces MaxMapTime
> MinMapTIme AvgMapTime MaxReduceTime MinReduceTime
> AvgReduceTime Alias Feature Outputs
> job_201103122121_0088 11 0 11 5
> 9 0 0
> 0 REL MAP_ON LY
> job_201103122121_0089 266 1 151 101
> 123 1566 1566
> 1566 ANNO,ISA_ANNO,ISA_ANNO_C,ISA_ANNO_T
> REPLICATED_JOIN,GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp-1729753
> 626/tmp-61569771,
>
> Input(s):
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
>
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp-1729753626/tmp-61569771"
>
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
>
> Job DAG:
> job_201103122121_0088 -> job_201103122121_0089,
> job_201103122121_0089
>
>
> 2011-04-15 11:26:32,751 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 11:26:32,889 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp ut paths to
> process : 1
> 2011-04-15 11:26:32,899 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T otal input paths
> to process : 1
> (844872046)
>
> *
> *
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id PARALLEL 10;store ISA_ANNO into 'outputdel';
> *
>
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:08:52
> 2011-04-15 16:16:26 HASH_JOIN
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
> MaxReduceTime MinReduceTime
> AvgReduc eTime Alias
> Feature Outputs
> job_201103122121_0090 277 10 15 6 11 432
> 353 394 ANNO,ISA_ANNO,REL
> H ASH_JOIN
> hdfs://haisen11:54310/user/haisen/outputdel,
>
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel"
>
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8537764
>
> Job DAG:
> job_201103122121_0090
>
> 2011-04-15 16:16:26,320 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLaunc
> her - Success!
>
>
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id USING 'replicated';store ISA_ANNO into
> 'outputdel';*
>
>
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:32:20
> 2011-04-15 17:02:16 REPLICATED_JOIN
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
> MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
> job_201103122121_0093 11 0 10 5 9 0
> 0 0 REL MAP_ONLY
> job_201103122121_0094 266 0 156 96 128 0
> 0 0 ANNO,ISA_ANNO REPLICATED_JOIN,MAP_ONLY
> hdfs://haisen11:54310/user/haisen/outputdel1,
>
> Input(s):
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel1"
>
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
>
> Job DAG:
> job_201103122121_0093 -> job_201103122121_0094,
> job_201103122121_0094
>
>
> 2011-04-15 17:02:16,651 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher
> - Success!
>
>
>
>
>
>
>
>
--
--
Re: How to improve the performs of PIG Join
Posted by Thejas M Nair <te...@yahoo-inc.com>.
For default join (hash join) -
- Increasing the parallelism of the default join should speed it up.
- Put the table which has large number of tuples per key as the last table
in join . (Yes, this happens to be the opposite of the recommendation for
replicated join !) See -
http://pig.apache.org/docs/r0.8.0/cookbook.html#Take+Advantage+of+Join+Optim
izations
- http://pig.apache.org/docs/r0.8.0/cookbook.html#Project+Early+and+Often
For replicated join -
- I believe the reason why replicated join is performing worse that default
join is because of the large number of maps and the large size of the
replicated file. Each map task ends up reading and deserializing the
replicated file( obs_relation.txt), and usually that takes bulk of the
runtime. In this case (691MB x 266 (maps) =~) 183GB of replicated input data
will be read and deserialized by all the map tasks. This is actually very
small compared to size of the larger input (17GB).
To reduce the number of maps, you can use the feature introduced in
https://issues.apache.org/jira/browse/PIG-1518 , ensure that you have the
property pig.splitCombination=true, and pig.maxCombinedSplitSize=X, where X
= size_of_obr_pm_annotation.txt/number-of-map-slots . This will ensure that
all cluster slots are used and you don't have too many map tasks.
-Thejas
On 4/17/11 6:03 AM, "byambajargal" <by...@gmail.com> wrote:
> Hello ...
> I have a cluster with 11 nodes each of them have 16 GB RAM, 6 core CPU,
> ! TB HDD and i use cloudera distribution CHD4b with Pig. I have two Pig
> Join queries which are a Parallel and a Replicated version of pig Join.
>
> Theoretically Replicated Join could be faster than Parallel join but in
> my case Parallel is faster.
> I am wondering why the replicated join is so slowly. i wont to improve
> the performance of both query. Could you check the detail of the queries.
>
> thanks
>
> Byambajargal
>
>
> ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id*PARALLEL 10*;ISA_ANNO_T = GROUP ISA_ANNO
> ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump ISA_ANNO_C
>
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:31:36
> 2011-04-15 10:43:22
> HASH_JOIN,GROU P_BY
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces
> MaxMapTime MinMapTIme AvgMapTime MaxReduceTime
> MinReduceTime AvgReduceTime Alias Feature Outputs
> job_201103122121_0084 277 10 15
> 5 11 417
> 351 379 ANNO,ISA_ANNO,
> REL HASH_JOIN
> job_201103122121_0085 631 1 10
> 5 7 242
> 242 242 ISA_ANNO_C,ISA_ANNO_T
> GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868,
>
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp281466632/tmp-171526868"
>
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8781684
>
> Job DAG:
> job_201103122121_0084 -> job_201103122121_0085,
> job_201103122121_0085
>
>
> 2011-04-15 10:43:22,403 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp ut paths to
> process : 1
> 2011-04-15 10:43:22,419 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T otal input paths
> to process : 1
> (844872046)
>
>
> *Using replicated version*
> *ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_idUSING 'replicated';ISA_ANNO_T = GROUP
> ISA_ANNO ALL;ISA_ANNO_C = foreach ISA_ANNO_T generate COUNT($1); dump
> ISA_ANNO_C*
> **
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 10:57:37
> 2011-04-15 11:26:32
> REPLICATED_JOI
> N,GROUP_BY
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces MaxMapTime
> MinMapTIme AvgMapTime MaxReduceTime MinReduceTime
> AvgReduceTime Alias Feature Outputs
> job_201103122121_0088 11 0 11 5
> 9 0 0
> 0 REL MAP_ON LY
> job_201103122121_0089 266 1 151 101
> 123 1566 1566
> 1566 ANNO,ISA_ANNO,ISA_ANNO_C,ISA_ANNO_T
> REPLICATED_JOIN,GROUP_BY,COMBINER
> hdfs://haisen11:54310/tmp/temp-1729753
> 626/tmp-61569771,
>
> Input(s):
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
>
> Output(s):
> Successfully stored 1 records (14 bytes) in:
> "hdfs://haisen11:54310/tmp/temp-1729753626/tmp-61569771"
>
> Counters:
> Total records written : 1
> Total bytes written : 14
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
>
> Job DAG:
> job_201103122121_0088 -> job_201103122121_0089,
> job_201103122121_0089
>
>
> 2011-04-15 11:26:32,751 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapR
> educeLauncher - Success!
> 2011-04-15 11:26:32,889 [main] INFO
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total
> inp ut paths to
> process : 1
> 2011-04-15 11:26:32,899 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil -
> T otal input paths
> to process : 1
> (844872046)
>
> *
> *
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id PARALLEL 10;store ISA_ANNO into 'outputdel';
> *
>
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:08:52
> 2011-04-15 16:16:26 HASH_JOIN
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
> MaxReduceTime MinReduceTime
> AvgReduc eTime Alias
> Feature Outputs
> job_201103122121_0090 277 10 15 6 11 432
> 353 394 ANNO,ISA_ANNO,REL
> H ASH_JOIN
> hdfs://haisen11:54310/user/haisen/outputdel,
>
> Input(s):
> Successfully read 24153638 records from: "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel"
>
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 41
> Total records proactively spilled: 8537764
>
> Job DAG:
> job_201103122121_0090
>
> 2011-04-15 16:16:26,320 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLaunc
> her - Success!
>
>
> * ANNO = load '/datastorm/task3/obr_pm_annotation.txt' using
> PigStorage(',') AS (element_id:long,concept_id:long); ;REL = load
> '/datastorm/task3/obs_relation.txt' using PigStorage(',') AS
> (id:long,concept_id:long,parent_concept_id:long);ISA_ANNO = join ANNO by
> concept_id,REL by concept_id USING 'replicated';store ISA_ANNO into
> 'outputdel';*
>
>
> HadoopVersion PigVersion UserId StartedAt FinishedAt
> Features
> 0.20.2-CDH3B4 0.8.0-CDH3B4 haisen 2011-04-15 16:32:20
> 2011-04-15 17:02:16 REPLICATED_JOIN
>
> Success!
>
> Job Stats (time in seconds):
> JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime
> MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
> job_201103122121_0093 11 0 10 5 9 0
> 0 0 REL MAP_ONLY
> job_201103122121_0094 266 0 156 96 128 0
> 0 0 ANNO,ISA_ANNO REPLICATED_JOIN,MAP_ONLY
> hdfs://haisen11:54310/user/haisen/outputdel1,
>
> Input(s):
> Successfully read 24153638 records (691022731 bytes) from:
> "/datastorm/task3/obs_relation.txt"
> Successfully read 442049697 records (17809735666 bytes) from:
> "/datastorm/task3/obr_pm_annotation.txt"
>
> Output(s):
> Successfully stored 844872046 records (34500196186 bytes) in:
> "hdfs://haisen11:54310/user/haisen/outputdel1"
>
> Counters:
> Total records written : 844872046
> Total bytes written : 34500196186
> Spillable Memory Manager spill count : 0
> Total bags proactively spilled: 0
> Total records proactively spilled: 0
>
> Job DAG:
> job_201103122121_0093 -> job_201103122121_0094,
> job_201103122121_0094
>
>
> 2011-04-15 17:02:16,651 [main] INFO
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher
> - Success!
>
>
>
>
>
>
>
>
--