You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by java8964 <ja...@hotmail.com> on 2014/04/17 21:09:10 UTC

How to flatten array of Tuple



Hi, 
For PIG 12.0, I have the following data stored in the AVRO format:
grunt> describe all_contacts;all_contacts: {account_id: long,contact_id: chararray,sequence_id: int,state: chararray,name: chararray,kind: chararray,prefix_name: chararray,first_name: chararray,middle_name: chararray,company_name: chararray,job_title: chararray,source_name: chararray,source_details: chararray,provider_name: chararray,provider_details: chararray,created_at: long,create_source: chararray,updated_at: long,update_source: chararray,accessed_at: long,deleted_at: long,delta: boolean,contact_fields: {ARRAY_ELEM: (custom_field_id: chararray,value: chararray,date_value: long)},related_contacts: {ARRAY_ELEM: (related_contact_id: chararray,kind: chararray,role: chararray,created_at: long,updated_at: long,deleted_at: long)},contact_channels: {ARRAY_ELEM: (channel_id: chararray,address: chararray,state: chararray,create_source: chararray,update_source: chararray,created_at: long,updated_at: long,deleted_at: long,description: chararray,history: chararray,source_ip: long,optin_at: long,optout_at: long,confirmed: chararray)},contact_notes: {ARRAY_ELEM: (note_id: chararray,content: chararray,created_at: long,updated_at: long)},contact_service_addresses: {ARRAY_ELEM: (service_address_id: chararray,type: chararray,address: chararray,sort_order: int,kind: chararray,created_at: long,updated_at: long,create_source: chararray,update_source: chararray,deleted_at: long)},contact_street_addresses: {ARRAY_ELEM: (street_address_id: chararray,kind: chararray,street: chararray,city: chararray,state: chararray,postal_code: chararray,country: chararray,sort_order: int,created_at: long,create_source: chararray,updated_at: long,update_source: chararray,deleted_at: long)}}
As you can see, for each row of contact, there is an array of associated channels, which itself is struct in AVRO, or Tuple in Pig. Same apply to contact_service_address.And you can see there are "address" field in both contact_channels and contact_service_addresses tuples. Now my requirement is to compare the address from contact_channels to contact_service_addresses, if matched, count this channel as one; otherwise, ignore this channel.
I tried different way in PIG for this request, but now I kind of facing some problems. The big one is how to keep the Tuple together, and only compare one subfield to another tuple's subfield.
I am using the following example to describe what I did, and what is my current problem.
-- load all the contactsall_contacts = LOAD 'data' USING org.apache.pig.piggybank.storage.avro.AvroStorage();
-- generate contact, channels and service_addresses with needed fieldsnon_deleted_channels_contacts = FOREACH all_contacts {    csa_addresses = FOREACH contact_service_addresses GENERATE address;    channels = FOREACH contact_channels GENERATE channel_id, address, deleted_at, state;    non_deleted_channels = FILTER channels BY state != 'D';    GENERATE account_id, contact_id, deleted_at as contact_deleted_at, csa_addresses, non_deleted_channels;};
-- now I kind of not sure how to do it, here is what I tried:match_address_non_deleted_channels_contacts = FOREACH non_deleted_channels_contacts     GENERATE account_id, contact_id, contact_deleted_at, FLATTEN(csa_addresses.address) as csa_address, FLATTEN(non_deleted_channels.address) as cc_address,    FLATTEN(non_deleted_channels.channel_id) as cc_channel_id;
I don't want to flatten the data, but I cannot do it in nested FOREACH, and FOREACH can only do 2 levels nested in PIG. But if I want to compare the address of channel to the address of service_address, I have to use 3 level FOREACH, as following:FOREACH contacts {    FOREACH service_addresses {        FOREACH channels {        }    }}
which is not supported in PIG 12. So I gave up this way.
But if I use FLATTEN, I have to FLATTEN both (channels.address) and (channels.channel_id), which expands the data not the way I want.For example, if one contact has 4 service_address, and 2 channels, what I assume after FLATTEN it will be 8 rows for this contact, as (4 x 2 = 8). But what pig gives me back is 16 rows, as (4 service_address.address x 2 channels.address x 2 channels.channel_id = 16 rows).The pig treats the address and channel_id from Channel tuple as separated elements in the flatten operation, which makes the rows grow way too fast. If I need additional fields out from the channel tuple, it just explode too much unnecessary. But I don't know how to flatten the array of tuple together, as right now I can only flatten one field of tuple individually.
If I do this:
match_address_non_deleted_channels_contacts = FOREACH non_deleted_channels_contacts     GENERATE account_id, contact_id, contact_deleted_at, FLATTEN(csa_addresses.address) as csa_address, FLATTEN(non_deleted_channels) as channels;
It will give me error:2014-04-17 15:02:16,384 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1031: Incompatable schema: left is "channels:NULL", right is "non_deleted_channels::channel_id:chararray,non_deleted_channels::address:chararray,non_deleted_channels::deleted_at:long,non_deleted_channels::state:chararray"
Is there a way to flatten the array of tuple out as a whole tuple, so I can use individual fields later, instead of flattening individual fields out, make the rows explode wrong?
I google this case, only suggestion I can find looks like is to NOT flatten the array, but then in my case, I cannot compare the field from one tuple to another field in another tuple.
I hope I explain my problem clearly. Any suggestion is appreciated.
Thanks
Yong