You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Paul B <pb...@gmail.com> on 2009/10/14 01:04:12 UTC
Stream within a Group
I'm setting up a pig job that needs to stream a grouped set of data to an
instance of a perl script. I need to ensure that a full group is run
through a single instance of the perl script (don't want a group's partial
set as the perl input). The perl script expects one record per line (not a
group field + bag). It doesn't look like we can use the STREAM operator
inside a FOREACH. Does the following code work for what I'm trying to do?
Does it guarantee a full group gets executed by one instance of the perl
script?
log = LOAD 'request*'
AS (ts:chararray, vid:chararray, rh:chararray, rid:chararray);
group_req = GROUP log BY vid PARALLEL 12;
group_sort_req = FOREACH group_req {
gsr = ORDER log1 BY ts;
GENERATE FLATTEN(gsr);
};
sessioned = STREAM group_sort_req THROUGH `sessions.pl`;
DUMP sessioned;
Re: Stream within a Group
Posted by Alan Gates <ga...@yahoo-inc.com>.
What you propose below will result in all of the records for a given
group going to a single instance of sessions.pl.
Alan.
On Oct 13, 2009, at 4:04 PM, Paul B wrote:
> I'm setting up a pig job that needs to stream a grouped set of data
> to an
> instance of a perl script. I need to ensure that a full group is run
> through a single instance of the perl script (don't want a group's
> partial
> set as the perl input). The perl script expects one record per line
> (not a
> group field + bag). It doesn't look like we can use the STREAM
> operator
> inside a FOREACH. Does the following code work for what I'm trying
> to do?
> Does it guarantee a full group gets executed by one instance of the
> perl
> script?
>
> log = LOAD 'request*'
> AS (ts:chararray, vid:chararray, rh:chararray, rid:chararray);
>
> group_req = GROUP log BY vid PARALLEL 12;
>
> group_sort_req = FOREACH group_req {
> gsr = ORDER log1 BY ts;
> GENERATE FLATTEN(gsr);
> };
>
> sessioned = STREAM group_sort_req THROUGH `sessions.pl`;
> DUMP sessioned;