You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Jonathan Coveney <jc...@gmail.com> on 2011/02/02 23:27:38 UTC

Nondeterministic results depending on whether you have an intermediate load

I am currently running this on:
https://svn.apache.org/repos/asf/pig/branches/branch-0.8

So, I have a rather long script where the results that are output depend on
whether or not I have an intermediate store. Clearly this is undesirable :)
I was just curious if this might be related to any known issues?

I am not sure how to try and replicate this for the listserv... it's a
rather long script and a lot is going on, and I can't really upload any of
the source of data. If its not a known issue, I can do some work to try and
use dummy sources of data to replicate. Either way, a very surprising
find...

I appreciate any help on this. I know that it is not the cleanest code (I
didn't write it ;), although that is in no small part due to the fact that
we've had to side step a lot of annoying little bugs.

The bug has to do with the end: if you comment out the store of set1234 you
get different results than if you do not. The results with the set1234 store
are correct, although there is no rhyme or reason to the excluded values
(that we can discern).

REGISTER /home/jcoveney/pig-0.8.0/squeal.jar;

--load requisite data
big_table_fact = LOAD
'/data/big_table/date=20110127/c1=09/20110127_00_big_table_aster_09.bcp_1.gz'
AS (source:chararray, ss2k:int,  osid:chararray, browser_type:int,
ip_address:long, record_type:int, url:chararray, c1:chararray, c2:chararray,
 c3:chararray, c4:chararray, c5:chararray, c6:chararray, c7:chararray,
c8:chararray, c9:chararray, c10:chararray,  c11:chararray, c12:chararray,
c13:chararray, c14:chararray, c15:chararray, c16:chararray, iab_flag:int,
akamai_id:long,  uid:chararray, recorded:chararray, xml_version:chararray,
uahash:chararray);

--load m_lookup
m_lookup = LOAD '/user/jcoveney/mlookup.csv' USING PigStorage(',') AS
(beginip:chararray, endip:chararray, beginipnum:long,
endipnum:long,countryCode:chararray,countryName:chararray);


--load small_lookup lookup
small_lookup = LOAD '/user/vnair/small_lookup_lookup.txt' AS
(meaning_val:chararray,
 tye:chararray,gender:chararray,min_value:int,max_value:int);


--generate requisite columns
A = foreach big_table_fact generate uid,c2,c3,ip_address,ss2k,ss2k/86400 as
time_id;

--small_lookup filter
B = filter A by c2 == '6693868';

--apply m_lookup
D = cross B,m_lookup;

E = filter D by (ip_address >= beginipnum AND ip_address <= endipnum);

--extract meaning_val information from c3
F = foreach E generate squeal.bug.compressuid(uid) as
uid,c2,ss2k,time_id,countryCode,((REGEX_EXTRACT(c3,'(4:)([^|]*)',2)  matches
'\\d{4}') ? (int)REGEX_EXTRACT(c3,'(4:)([^|]*)',2) : (int)0) AS
birth_year,FLATTEN(STRSPLIT(REGEX_EXTRACT (c3,'(3:)([^|]*)',2),',',4)) AS
(meaning_val1:chararray,meaning_val2:chararray,meaning_val3:chararray,meaning_val4:chararray);

--extra step, to generate data, seemed to be necessary to make sure the
flatten takes effect
G = foreach F generate *;

--transpose the data set on the key to begin count of distinct meaning_vals
per segment and remove those that have more than one  distinct meaning_val
per segment
H = foreach G generate uid,c2,ss2k,time_id,
birth_year,countryCode,meaning_val1 as meaning_val;

I = foreach G generate
uid,c2,ss2k,time_id,birth_year,countryCode,meaning_val2 as meaning_val;

H_a = foreach G generate
uid,c2,ss2k,time_id,birth_year,countryCode,meaning_val3 as meaning_val;

I_a = foreach G generate
uid,c2,ss2k,time_id,birth_year,countryCode,meaning_val4 as meaning_val;

J = UNION H,I,I_a,H_a;

--assign segment based on small_lookup lookup, 1 = age , 2 = gender, 3 =
income 4 = children 0 = Invalid
X = FOREACH J GENERATE *,((meaning_val matches '1.{2}') OR (meaning_val ==
'90') ? 1 : ((meaning_val matches '9.{2}') ? 2 : ((meaning_val matches
'5.{2}') ? 3 : ((meaning_val matches '6.{2}') ? 4 : 0)))) as segment;

--remove invalid
X1 = FILTER X BY (segment == 1 OR segment == 2 OR segment == 3 OR segment ==
4);

Y = GROUP X1 BY (uid);

Z = foreach Y {
       dst_segment = DISTINCT X1.segment;
       dst_meaning_val = DISTINCT X1.meaning_val;
       GENERATE X1,COUNT(dst_segment) as ct_seg,COUNT(dst_meaning_val) as
ct_meaning_val;
       };

Z1 = FILTER Z BY ct_seg == ct_meaning_val;

--flatten bag
Z2 = foreach Z1 generate FLATTEN(X1) AS
 (uid:chararray,c2:chararray,ss2k:int,time_id:int,birth_year:int,countryCode:chararray,meaning_val:chararray,segment:int);

--assign meaning_val values
setA = JOIN Z2 by meaning_val,small_lookup by meaning_val;

--retain necessary parameters
setB = foreach setA generate Z2::uid as uid,Z2::c2 as c2,Z2::ss2k as
ss2k,Z2::time_id as time_id ,Z2::birth_year as  birth_year,Z2::countryCode
as countryCode,Z2::segment as segment,small_lookup::gender as
gender,small_lookup::min_value as  min_value,small_lookup::max_value as
max_value;

--distinct the set to prevent cartesian joins
set1_d = filter setB BY segment == 1;

set1 = DISTINCT set1_d;

set2_d = filter setB BY segment == 2;

set2 = DISTINCT set2_d;

set3_d = filter setB BY segment == 3;

set3 = DISTINCT set3_d;

set4_d = filter setB BY segment == 4;


set4 = DISTINCT set4_d;

--allign four sets to transpose the meaning_vals as columns

set12_a = join set1 by (uid,c2,ss2k,time_id,birth_year,countryCode) full,
set2 by  (uid,c2,ss2k,time_id,birth_year,countryCode);

set12 = foreach set12_a generate ((set1::uid IS NULL) ? set2::uid :
set1::uid) as uid,
       ((set1::c2 IS NULL) ? set2::c2 : set1::c2) as c2,
       ((set1::ss2k IS NULL) ? set2::ss2k : set1::ss2k) as ss2k,
       ((set1::time_id IS NULL) ? set2::time_id : set1::time_id) as time_id,
       ((set1::countryCode IS NULL) ? set2::countryCode : set1::countryCode)
as countryCode,
       ((set1::birth_year IS NULL) ? set2::birth_year : set1::birth_year) as
birth_year,
       set2::gender as gender, set1::min_value as min_age,set1::max_value as
max_age;

set123_a = join set12 by (uid,c2,ss2k,time_id,birth_year,countryCode) full,
set3 by  (uid,c2,ss2k,time_id,birth_year,countryCode);

set123 = foreach set123_a generate ((set12::uid IS NULL) ? set3::uid :
set12::uid) as uid,
       ((set12::c2 IS NULL) ? set3::c2 : set12::c2) as c2,
       ((set12::ss2k IS NULL) ? set3::ss2k : set12::ss2k) as ss2k,
       ((set12::time_id IS NULL) ? set3::time_id : set12::time_id) as
time_id,
       ((set12::countryCode IS NULL) ? set3::countryCode:
set12::countryCode) as countryCode,
       ((set12::birth_year IS NULL) ? set3::birth_year : set12::birth_year)
as birth_year,
       set12::gender as gender, set12::min_age as min_age,set12::max_age as
max_age,set3::min_value as  min_income,set3::max_value as max_income;

set1234_a = join set123 by (uid,c2,ss2k,time_id,birth_year,countryCode)
full, set4 by  (uid,c2,ss2k,time_id,birth_year,countryCode);

set1234 = foreach set1234_a generate ((set123::uid IS NULL) ? set4::uid :
set123::uid) as uid,
       ((set123::c2 IS NULL) ? set4::c2 : set123::c2) as c2,
       ((set123::ss2k IS NULL) ? set4::ss2k : set123::ss2k) as ss2k,
       ((set123::time_id IS NULL) ? set4::time_id : set123::time_id) as
time_id,
       ((set123::countryCode IS NULL) ? set4::countryCode :
set123::countryCode) as countryCode,
       ((set123::birth_year IS NULL) ? set4::birth_year :
set123::birth_year) as birth_year,
       set123::gender as gender,
set123::min_age,set123::max_age,set123::min_income,set123::max_income,set4::min_value
as  has_children;

--filter where birth year does not match the age range
final_a = FILTER set1234 by (2011 - birth_year >= min_age OR 2011 -
birth_year <= max_age + 1 OR birth_year == 0 OR min_age IS NULL);

final = foreach final_a generate uid,c2,ss2k,time_id,countryCode,gender,
((birth_year == 0) ? min_age : 2011 - birth_year)  as min_age,((birth_year
== 0) ? max_age : 2011 - birth_year) as
max_age,min_income,max_income,has_children;

store set1234 into '/user/test_i.bcp' USING PigStorage('\t');
store final into '/user/test_k.bcp' USING PigStorage('\t');