You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2008/11/24 22:45:39 UTC

[Pig Wiki] Update of "PigMix" by AlanGates

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by AlanGates:
http://wiki.apache.org/pig/PigMix

New page:
= Pig Mix =

PigMix is a set of queries used test pig performance from release to release.  There are queries that test latency (how long does it
take to run this query?), and queries that test scalability (how many fields or records can pig handle before it fails?).  In addition
it includes a set of map reduce java programs to run equivalent map reduce jobs directly.  These will be used to test the performance
gap between direct use of map reduce and using pig.

== Runs ==

== November 19, 2008 ==
PigMix was run on a 25 machine hadoop cluster.  The cluster was running hadoop version 0.18.1.  The tests were run against two
versions of pig:  top of trunk, and top of types branch both as of Nov 21 2008.

So far only the latency tests have been run, not the scalability tests.  Equivalent map reduce programs have not yet been run.

The tests were run three times for each version and the results averaged.

|| Query || Top of Trunk || Top of Types Branch ||
|| L1 explode || 261 || 283 ||
|| L2 fr join || 1665 || 253 ||
|| L3 join || 1912 || 320 ||
|| L4 distinct agg || 254 || 193 ||
|| L5 anti-join || 1535 || 281 ||
|| L6 large group by key || 294 || 226 ||
|| L7 nested split || 243 || 204 ||
|| L8 group all || 462 ||  194 ||
|| L9 order by 1 field || 5294 || 867 ||
|| L10 order by multiple fields || 1403 || 565 ||
|| L11 distinct + union || 316 || 255 ||
|| L12 multi-store || fails || 781 ||

== Features Tested ==
Based on a sample of user queries, PigMix includes tests for the following features.
 1. Data with many fields, but only a few are used.
 1. Reading data from maps.
 1. Use of bincond and arithmetic operators.
 1. Exploding nested data.
 1. Load bzip2 data
 1. Load uncompressed data
 1. join with one table small enough to fit into a fragment and replicate algorithm.
 1. join where tables are sorted and partitioned on the same key
 1. Do a cogroup that is not immediately followed by a flatten (that is, use cogroup for something other than a straight forward join).
 1. group by with only algebraic udfs that has nested plan (distinct aggs basically).
 1. foreachs with nested plans including filter and implicit splits.
 1. group by where the key accounts for a large portion of the record.
 1. group all
 1. union plus distinct
 1. order by
 1. multi-store query (that is, a query where data is scanned once, then split and grouped different ways).

The data is generated so that it has a zipf type distribution for the group by and join keys, as this models most human generated
data.
Some other fields are generated using a uniform data distribution.

Scalability tests test the following:
 1. Join of very large data sets.
 1. Grouping of very large data set.
 1. Query with a very wide (500+ fields) row.
 1. Loading many data sets together in one load

== Proposed Data ==
Initially, four data sets have been created.  The first, `page_views`, is 10 million rows in size, with a schema of:

|| '''Name''' || '''Type''' || '''Average Length''' || '''Cardinality''' || '''Distribution''' || '''Percent Null''' ||
|| user || string || 20 || 1.6M || zipf || 7 ||
|| action || int || X || 2 || uniform || 0 ||
|| timespent || int || X || 20 || zipf || 0 ||
|| query_term || string || 10 || 1.8M || zipf || 20 ||
|| ip_addr || long || X || 1M || zipf || 0 ||
|| timestamp || long || X || 86400 || uniform || 0 ||
|| estimated_revenue || double || X || 100k || zipf || 5 ||
|| page_info || map || 15 || X || zipf || 0 ||
|| page_links || bag of maps || 50 || X || zipf || 20 ||

The second, `users`, was created by taking the unique user keys from `page_views` and adding additional columns.

|| '''Name''' || '''Type''' || '''Average Length''' || '''Cardinality''' || '''Distribution''' || '''Percent Null''' ||
|| name || string || 20 || 1.6M || unique || 7 ||
|| phone || string || 10 || 1.6M || zipf || 20 ||
|| address || string || 20 || 1.6M || zipf || 20 ||
|| city || string || 10 || 1.6M || zipf || 20 ||
|| state || string || 2 || 1.6M || zipf || 20 ||
|| zip || int || X || 1.6M || zipf || 20 ||

The third, `power_users`, has 500 rows, and has the same schema as users.  It was generated by skimming 500 unique names from
users.  This will produce a table that can be used to test fragment replicate type joins.

The fourth, `widerow`, has a very wide row (500 fields), consisting of one string and 499 integers.

`users`, `power_users`, and `widerow` are written in ASCII format, using Ctrl-A as the field delimiter.  They can be read using
!PigStorage.

`page_views` is written in as text data, with Ctrl-A as the field delimiter.  Maps in the file are delimited by Ctrl-C
between key value pairs and Ctrl-D between keys and values.  Bags in the file are delimited by Ctrl-B between tuples in the bag.
A special loader, !PigPerformance loader has been written to read this format. 

== Proposed Scripts ==

=== Scalability ===
'''Script S1'''

This script tests grouping, projecting, udf envocation, and filtering with a very wide row.  Covers scalability feature 3.
{{{
A = load '$widerow' using PigStorage('\u0001') as (name: chararray, c0: int, c1: int, ..., c500: int);
B = group A by name parallel $parrallelfactor;
C = foreach B generate group, SUM(A.c0) as c0, SUM(A.c1) as c1, ... SUM(A.c500) as c500;
D = filter C by c0 > 100 and c1 > 100 and c2 > 100 ... and c500 > 100;
store D into '$out';
}}}


'''Script S2'''
This script tests joining two inputs where a given value of the join key appears many times in both inputs.  This will test pig's
ability to handle large joins.  It covers scalability features 1 and 2.

TBD

Features not yet tested:  4.

=== Latency ===
'''Script L1'''

This script tests reading from a map, flattening a bag of maps, and use of bincond (features 2, 3, and 4).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user, (int)action as action, (map[])page_info as page_info,
    flatten((bag{tuple(map[])})page_links) as page_links;
C = foreach B generate user,
    (action == 1 ? page_info#'a' : page_links#'b') as header;
D = group C by user $parallelfactor;
E = foreach D generate group, COUNT(C) as cnt;
store E into '$out';
}}}

'''Script L2'''

This script tests using a join small enough to do in fragment and replicate (feature 7). 
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user, estimated_revenue;
alpha = load '$power_users' using PigStorage('\u0001') as (name, phone,
        address, city, state, zip);
beta = foreach alpha generate name;
C = join beta by name, A by user $parallelfactor;
store C into '$out';
}}}

'''Script L3'''

This script tests a join too large for fragment and replicate.  It also contains a join followed by a group by on the same key,
something that pig could potentially optimize by not regrouping.
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user, (double)estimated_revenue;
alpha = load '$users' using PigStorage('\u0001') as (name, phone, address,
        city, state, zip);
beta = foreach alpha generate name;
C = join beta by name, A by user $parallelfactor;
D = group C by $0 $parallelfactor;
E = foreach D generate group, SUM(C.estimated_revenue);
store E into '$out';

}}}

'''Script L4'''

This script covers foreach generate with a nested distinct (feature 10).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user, action;
C = group B by user $parallelfactor;
D = foreach C {
    aleph = B.action;
    beth = distinct aleph;
    generate group, COUNT(beth);
}
store D into '$out';
}}}


'''Script L5'''

This script does an anti-join.  This is useful because it is a use of cogroup that is not a regular join (feature 9).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user;
alpha = load '$users' using PigStorage('\u0001') as (name, phone, address,
        city, state, zip);
beta = foreach alpha generate name;
C = cogroup beta by name, A by user $parallelfactor;
D = filter C by COUNT(beta) == 0;
E = foreach D generate group;
store E into '$out';
}}}

'''Script L6'''

This script covers the case where the group by key is a significant percentage of the row (feature 12).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user, action, (int)timespent as timespent, query_term, ip_addr, timestamp;
C = group B by (user, query_term, ip_addr, timestamp) $parallelfactor;
D = foreach C generate flatten(group), SUM(B.timespent);
store D into '$out';

}}}

'''Script L7'''

This script covers having a nested plan with splits (feature 11).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader() as (user, action, timespent, query_term,
            ip_addr, timestamp, estimated_revenue, page_info, page_links);
B = foreach A generate user, timestamp;
C = group B by user $parallelfactor;
D = foreach C {
    morning = filter B by timestamp < 43200;
    afternoon = filter B by timestamp >= 43200;
    generate group, COUNT(morning), COUNT(afternoon);
}
store D into '$out';
}}}

'''Script L8'''

This script covers group all (feature 13).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user, (int)timespent as timespent, (double)estimated_revenue as estimated_revenue;
C = group B all;
D = foreach C generate SUM(B.timespent), AVG(B.estimated_revenue);
store D into '$out';
}}}

'''Script L9'''

This script covers order by of a single value (feature 15).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = order A by query_term $parallelfactor;
store B into '$out';
}}}

'''Script L10'''

This script covers order by of multiple values (feature 15).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent:int, query_term, ip_addr, timestamp,
        estimated_revenue:double, page_info, page_links);
B = order A by query_term, estimated_revenue desc, timespent $parallelfactor;
store B into '$out';
}}}


'''Script L11'''

This script covers distinct and union and reading from a wide row but using only one field (features: 1, 14).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user;
C = distinct B $parallelfactor;
alpha = load '$widerow' using PigStorage('\u0001');
beta = foreach alpha generate $0 as name;
gamma = distinct beta $parallelfactor;
D = union C, gamma;
E = distinct D $parallelfactor;
store E into '$out';
}}}

'''Script L12'''

This script covers multi-store queries (feature 16).
{{{
register pigperf.jar;
A = load '$page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user, action, (int)timespent as timespent, query_term,
    (double)estimated_revenue as estimated_revenue;
split B into C if user is not null, alpha if user is null;
split C into D if query_term is not null, aleph if query_term is null;
E = group D by user $parallelfactor;
F = foreach E generate group, MAX(D.estimated_revenue);
store F into 'highest_value_page_per_user';
beta = group alpha by query_term $parallelfactor;
gamma = foreach beta generate group, SUM(alpha.timespent);
store gamma into 'total_timespent_per_term';
beth = group aleph by action $parallelfactor;
gimel = foreach beth generate group, COUNT(aleph);
store gimel into 'queries_per_action';
}}}

Features not yet covered:  5 (bzip data), 8 (sorted join)