You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2010/03/19 22:31:06 UTC

[Pig Wiki] Update of "MapSideCogroup" by AlanGates

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The "MapSideCogroup" page has been changed by AlanGates.
http://wiki.apache.org/pig/MapSideCogroup

--------------------------------------------------

New page:
= Pig Map Side Cogroup Proposal =

This document details the proposal to build a mapside cogroup operator in Pig.

== Restrictions ==
The map side cogroup would initially work with the following restrictions:

 1. All loaded files must be of the same storage type (no cogrouping Zebra and !PigStorage files).
 1. There must exist a loader for this storage type that implements Pig's !IndexableLoadFunc interface (currently only Zebra's !TableLoader meets this criteria.)
 1. The loader used for this must guarantee that it does not split a single value of a key across multiple splits.
 1. Inputs to cogroup must be aliases of load statements. No other operators are allowed.

== Design ==

The map side cogroup would work as follows:

 1. Add an interface !KeyValuesNotSplitLoader.  This interface will have only one method.  When called this method will indicate to the loader that it needs to not split keys across splits for this particular load.
 1. Run an initial MR job to build an index on the left most (assumably the largest) input using Pig's !MergeJoinIndexer
 1. Pig will need to construct the second MR job such that splits from the left most input are used as the splits for the job.  Loads of the other inputs (assumably the smaller inputs) will be done via the !IndexableLoadFunc.
 1. Currently !PigSplit contains the split number.  This information will need to be recorded in the UDFContext so that it can be retrieved by POMergeCogroup operator.  This transfer should probably be done in !PigInputFormat.createRecordReader.
 1. Inside a new POMergeCogroup operator:
{{{
    Determine which split we are in
    if (not last split) {
        Using the index generated by the MergeJoinIndexer,
              determine the first key of the next split
    }
    if (in first split) {
        open each of the other inputs at the beginning
    } else {
        Using the index generated in 1, determine the first key of our split
        open each of the other inputs at this first key 
    }

    construct a heap, using the cogrouping key as the key for the heap
    foreach (input) {
        pull first record
        annotate with input number it came from
        insert into heap
    }
    while (there are records in heap) {
        pull top record from heap
        place in bag based on which input it came from
        if (key from large && EOF on split) continue
        pull record from input that last record from top of heap came from
        if (key pulled < first key in next split) insert into heap
    }
    output final record
}}}
    

== Example ==
Assume a set of tables `mytables` with one large file and n smaller files.  These files have schemas:

{{{
large: (k, x, y)
small1: (k, x, z)
...
smalln: (k, z, alpha)
}}}

Then the Pig Latin script would look like:

{{{
A = load 'large';
B1 = load 'small1';
...
Bn = load 'smalln';
C = cogroup A by k, B1 by k, ..., Bn by k using "merge";
}}}

And C would have a schema of:
{{{
k, bag: large{(k, x, y)}, bag: small1{(k, x, z)}, ..., bag: smalln{(k, z, alpha)}
}}}