You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Paul B <pb...@gmail.com> on 2009/10/14 01:04:12 UTC

Stream within a Group

I'm setting up a pig job that needs to stream a grouped set of data to an
instance of a perl script.  I need to ensure that a full group is run
through a single instance of the perl script (don't want a group's partial
set as the perl input).  The perl script expects one record per line (not a
group field + bag).  It doesn't look like we can use the STREAM operator
inside a FOREACH.  Does the following code work for what I'm trying to do?
Does it guarantee a full group gets executed by one instance of the perl
script?

log = LOAD 'request*'
  AS (ts:chararray, vid:chararray, rh:chararray, rid:chararray);

group_req = GROUP log BY vid PARALLEL 12;

group_sort_req = FOREACH group_req {
                    gsr = ORDER log1 BY ts;
                    GENERATE FLATTEN(gsr);
                    };

sessioned = STREAM group_sort_req THROUGH `sessions.pl`;
DUMP sessioned;

Re: Stream within a Group

Posted by Alan Gates <ga...@yahoo-inc.com>.
What you propose below will result in all of the records for a given  
group going to a single instance of sessions.pl.

Alan.

On Oct 13, 2009, at 4:04 PM, Paul B wrote:

> I'm setting up a pig job that needs to stream a grouped set of data  
> to an
> instance of a perl script.  I need to ensure that a full group is run
> through a single instance of the perl script (don't want a group's  
> partial
> set as the perl input).  The perl script expects one record per line  
> (not a
> group field + bag).  It doesn't look like we can use the STREAM  
> operator
> inside a FOREACH.  Does the following code work for what I'm trying  
> to do?
> Does it guarantee a full group gets executed by one instance of the  
> perl
> script?
>
> log = LOAD 'request*'
>  AS (ts:chararray, vid:chararray, rh:chararray, rid:chararray);
>
> group_req = GROUP log BY vid PARALLEL 12;
>
> group_sort_req = FOREACH group_req {
>                    gsr = ORDER log1 BY ts;
>                    GENERATE FLATTEN(gsr);
>                    };
>
> sessioned = STREAM group_sort_req THROUGH `sessions.pl`;
> DUMP sessioned;