You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Alan Gates <ga...@yahoo-inc.com> on 2010/03/31 19:33:18 UTC

Re: ORDER_LIST_DESC_EXTRACT - UDF required or can be done in nested FOREACH?

For doing session analysis and such, nested order by should do what  
you need.

A = load 'weblog' as (userid, timestamp, url, ...);
B = group A by userid;
C = foreach B {
       C1 = order A by timestamp;
       generate group, session_analysis_udf(C1);
}

Alan.

On Mar 29, 2010, at 2:08 PM, Russell Jurney wrote:

> I have written several variations of UDFs to achieve the following:
>
> They take a bag of tuples, sort the tuples on a field, then perform
> operations using this order.  That could be simply returning a list of
> sorted values from fields in a tuple (to maintain sort between  
> steps), or to
> create combinations of consecutive items - i.e. {(first, second),  
> (second,
> third), (third, fourth)...}  This kind of operation is common in  
> processing
> web logs within sessions (to get transitions from url1 to url2 for  
> funnel
> analysis), as well as other time series data.
>
> Basically, given a time series with unique IDs - group by ID, then  
> sort the
> time series data by one or more fields, and return one of two things:
>
> 1) An ordered list of one field in the time series.
> 2) Consecutive pairings of one field in the time series.
>
> My question is: do I have to use a UDF to do this?  It seems that I  
> do have
> to, if I want consecutive pairings.  But - can I rely on ordering of  
> items
> in a relation within a FOREACH?  If so, it seems I might not need  
> the UDF to
> simply order descending and extract a field, but would still need a  
> UDF to
> do output consecutive pairings - because there is no way to store  
> the state
> of the previous tuple iterated in a FOREACH.  Is this correct?
>
> I have also found that you cannot have a UDF accept a DataBag and  
> return a
> Tuple - Pig lets you do this but cannot process the tuple within a  
> tuple
> that results.  So my fix is to return a databag of one tuple, then  
> FLATTEN
> that bag.  Is this correct?  The docs speak of FLATTEN being able to  
> do
> things UDFs can't - is this what that means?
>
> Thanks in advance!
>
> -------------------
>
> Example use + code follows.  Given a bag, this UDF returns an  
> ordered list
> of values of a sorted field as a single tuple in a bag.:
>
> DESCRIBE things;
> things: {grouped_id: int,sub_things_time_series: {thing_id: int,name:
> chararray,start_date: int,end_date: int}}
>
> extracted_ordered = FOREACH things GENERATE
> ORDER_LIST_DESC_EXTRACT(sub_things_time_series, 1, 2) AS
> conecutive_sub_things;
> -- a describe of extracted_ordered is not helpful: {sub_things:  
> {null}}
> A = SAMPLE extracted_ordered 0.01;
> DUMP A;
>
> ({(first, second, third)})
> ({(first, second)})
> ({(first, second, third, fourth)})
>
> Note - the alternative form of the UDF (not shown) DUMPs:
>
> ({((first, second), (second, third), (third, fourth))})
>
> /**
> * Given a databag of tuples, sort the tuples on a specified field,  
> then
> extract a(nother) specified field, and
> * return the ordered list as many fields in a single tuple within a  
> single
> bag.  Flatten the bag to get your sorted values in a tuple.
> *
> * Used to process bags like this:
> *
> * things: {thing_id: int,name: chararray,start_date: int,end_date:  
> int}
> *
> * To get an output bag of tuples tuple like so containing ordered  
> pairings:
> *
> * ordered_fields: {(first, second, third, fourth)}
> *
> * expects a Tuple of size >=2 where <br>
> * arg1 = Tuples entry <br>
> * arg2 = Field to extract
> * arg3 = primary sort field idx. <br>
> * arg4 = secondary sort field idx. <br>
> * arg5 = tertiary sort field idx<br>. ...
> */
>
> public class ORDER_LIST_DESC_EXTRACT extends EvalFunc<DataBag>
> {
>
>  public DataBag exec(Tuple input)
>      throws IOException
>  {
>    if (input == null || input.size() < 2)
>      return null;
>
>    try
>    {
>      DataBag bag = (DataBag) input.get(0);
>      Integer extractKey = (Integer) input.get(1);
>
>      // Get list of primary/secondary sorting indexes
>      ArrayList<Integer> keyIdxList = new ArrayList<Integer>();
>      for (int i = 2; i < input.size(); i++)
>      {
>        keyIdxList.add((Integer) input.get(i));
>      }
>
>      // put the Bag data into a collection
>      ArrayList<Tuple> tupleList = new ArrayList<Tuple>();
>      Iterator<Tuple> iter = bag.iterator();
>      while (iter.hasNext())
>      {
>        tupleList.add(iter.next());
>      }
>
>      // sort the collections using key comparator.
>      Collections.sort(tupleList, new DataBagSorter(keyIdxList));
>
>      // Now that the list of tuples is sorted, iterate through and
> concatenate consecutive pairs of the pairKey fields
>      Iterator it = tupleList.iterator();
>
>      // For storing our sorted pairs
>      Tuple outputTuple = TupleFactory.getInstance().newTuple();
>
>      // Loop through
>      while(it.hasNext())
>      {
>          Tuple current = (Tuple)it.next();
>          {
>              // Create a list, append the values, then create a  
> tuple using
> the list and append to outputPairs
>              ArrayList<String> pairs = new ArrayList<String>();
>
>              outputTuple.append((String)current.get(extractKey));
>          }
>
>      }
>
>      ArrayList<Tuple> newTupleList = new ArrayList<Tuple>();
>      newTupleList.add(outputTuple);
>
>      return new DefaultDataBag(newTupleList);  //outputTuple; //new
> DefaultTuple();
>    }
>    catch (Exception e)
>    {
>      throw WrappedIOException.wrap("Caught exception processing  
> input of "
>          + this.getClass().getName(), e);
>    }
>  }
>
>  @Override
>  public Schema outputSchema(Schema input)
>  {
>    return new Schema(new  
> Schema.FieldSchema(getSchemaName(this.getClass()
>                                                               .getName 
> ()
>
> .toLowerCase(), input),
>                                             DataType.BAG));
>  }
>
>  class DataBagSorter implements Comparator<Tuple>
>  {
>    private final List<Integer> keyList;
>
>    public DataBagSorter(List<Integer> keys)
>    {
>      this.keyList = keys;
>    }
>
>    public int compare(Tuple t1, Tuple t2)
>    {
>      for (Integer key : keyList)
>      {
>        try
>        {
>          int val = compareItem(t1.get(key), t2.get(key));
>          if (val != 0)
>          {
>            // no need to go to secondary key if primary values are  
> not same
>            return val;
>          }
>        }
>        catch (ExecException e)
>        {
>          throw new RuntimeException(e);
>        }
>
>      }
>
>      return 0;
>    }
>  }
>
>  public int compareItem(Object val1, Object val2)
>  {
>    return ((Comparable) val1).compareTo((Comparable) val2);
>  }
> }