You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Thomas Edison <ju...@gmail.com> on 2013/05/06 05:11:07 UTC

Pig Unique Counts on Multiple Subsets of a Large Input

Hi there,

I have a huge input on an HDFS and I would like to use Pig to calculate
several unique metrics. To help explain the problem more easily, I assume
the input file has the following schema:

userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
dimensionC_key:chararray, activity:chararray, ...

Each record represent an activity performed by that userId.

Based on the value in the activity field, this activity record will be
mapped to 1 or more categories. There are about 10 categories in total.

Now I need to count the number of unique users for different dimension
combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
category.

What would be the best practices to perform such calculation?

I have tried several ways. Although I can get the results I want, it takes
a very long time (i.e. days). What I found is most of the time is spent on
the map phase. It looks like the script tries to load the huge input file
every time it tries to calculate one unique count. Is there a way to
improve this behavior?

I also tried something similar to below, but it looks like it reaches the
memory cap for a single reducer and just stuck at the last reducer step.

source = load ... as (userId:chararray, dimensionA_key:chararray,
dimensionB_key:chararray, dimensionC_key:chararray,
activity:chararray, ...);
a = group source by (dimensionA_key, dimensionB_key);
b = foreach a {
    userId1 = udf.newUserIdForCategory1(userId, activity);
    -- this udf returns the original user id if the activity should be
mapped to Category1 and None otherwise
    userId2 = udf.newUserIdForCategory2(userId, activity);
    userId3 = udf.newUserIdForCategory3(userId, activity);
    ...
    userId10 = udf.newUserIdForCategory10(userId, activity);
    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
COUNT(userId3), ..., COUNT(userId10);
}
store b ...;

Thanks.

T.E.

Re: Pig Unique Counts on Multiple Subsets of a Large Input

Posted by Prasanth J <bu...@gmail.com>.
Hi Thomas

It looks like you are trying to count unique users for multiple combinations of dimensions. If so, then you can make use of CUBE operator. Since counting unique users is not an algebraic measure (it is holistic measure) it may result in very slow reducer. There is JIRA which addresses this issue https://issues.apache.org/jira/browse/PIG-2831. If this is what you are looking for then I will look into this JIRA as soon as possible. 

Thanks
-- Prasanth

On May 6, 2013, at 10:03 AM, Thomas Edison <ju...@gmail.com> wrote:

> @Alan
> I just tried your method as shown below.  The script is stuck at the last
> reducer even for a relative small set of the data and less combinations.  I
> suspect it's an out of memory issue.  If I remember correctly, to use
> nested foreach to calculate the unique counts is not a very good idea.  Any
> suggestions?  Thanks.
> 
> T.E.
> 
> source = load ...;
> A = foreach each source generate dimA, dimB, userId,
>    udf.getActivity1UserId(userId, activity) as activity1_userId,
>    udf.getActivity2UserId(userId, activity) as activity2_userId,
>    udf.getActivity3UserId(userId, activity) as activity3_userId,
>    ...
>    udf.getActivity10UserId(userId, activity) as activity10_userId;
> 
> B = group A by (dimA, dimB);
> 
> C = foreach B {
>    unique_activity1 = distinct A.activity1_userId;
>    unique_activity2 = distinct A.activity2_userId;
>    unique_activity3 = distinct A.activity3_userId;
>    ...
>    unique_activity10 = distinct A.activity10_userId;
>    generate FLATTEN(group), COUNT(unique_activity1),
> COUNT(unique_activity2), COUNT(unique_activity3), ...,
> COUNT(unique_activity10);
> }
> 
> STORE C...;
> 
> 
> On Mon, May 6, 2013 at 8:41 AM, Thomas Edison <
> justdoit.thomas.edison@gmail.com> wrote:
> 
>> Thanks for the reply.
>> 
>> @Jonathan,
>> I haven't worked with CUBE before.  I will try to learn it.  Thanks for
>> the tip.
>> Currently, to split the activity, I use something like this.
>> 
>> new_relation = FILTER relation BY activity == 'abc' or activity == 'def';
>> 
>> In some cases, it is a one to one mapping, but not always.  To my
>> understanding, the SPLIT keyword is doing exactly the same as the way I'm
>> doing, correct?
>> 
>> @Alan,
>> I haven't tried your method.  I didn't come up with the UDF way until I
>> saw my old script is taking too much time in the map phase - scanning the
>> source multiple times.  I will try your method.  I also attached my old
>> code at the end, just in case.
>> 
>> I set my reducer at about 90% of my reducer cap.  I think this is what is
>> recommended.
>> 
>> It takes about 10-15 waves.
>> 
>> My old script:
>> source = load ...;
>> 
>> activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
>> A_1 = foreach activity_1 generate dimA, dimB, userId;
>> B_1 = distinct A_1;
>> C_1 = group B_1 by (dimA, dimB);
>> D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
>> STORE...
>> 
>> -- repeat for activity_1, but for other dimension combinations;
>> 
>> 
>> activity_2 = FILTER source BY activity = 'abc';
>> -- repeat whatever activity_1 has been done
>> 
>> -- repeat other activities.
>> 
>> Thanks.
>> 
>> T.E.
>> 
>> 
>> On Mon, May 6, 2013 at 8:12 AM, Alan Gates <ga...@hortonworks.com> wrote:
>> 
>>> In the script you gave I'd be surprised if it's spending time in the map
>>> phase, as the map should be very simple.  It's the reduce phase I'd expect
>>> to be very expensive because your mapping UDF prevents Pig from using the
>>> algebraic nature of count (that is, it has to ship all of the records to
>>> reduce not just the number of records).  If your file is large this will be
>>> expensive.  What happens if you switch your script to:
>>> 
>>> A = load ...
>>> B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
>>> activity) as userId1, ...
>>> C = group B by dimA, dimB
>>> D = foreach C generate flatten(group), COUNT(userId1), ...
>>> 
>>> When you said it was taking a long time in the map phase were you trying
>>> something like the above?  If so I'd check how long your UDF is taking.
>>> Unless you're reading tons of data on a very small cluster the above
>>> should be very fast.  It definitely should not reread the input for each
>>> UDF.
>>> 
>>> Other things to check:
>>> What's your parallel count set at?  That is, how many reducers are you
>>> running?
>>> How many waves of maps does this create?  That is, what's the number of
>>> maps this produces divided by the number of slots you get on your cluster
>>> to run it?
>>> 
>>> Alan.
>>> 
>>> On May 5, 2013, at 8:11 PM, Thomas Edison wrote:
>>> 
>>>> Hi there,
>>>> 
>>>> I have a huge input on an HDFS and I would like to use Pig to calculate
>>>> several unique metrics. To help explain the problem more easily, I
>>> assume
>>>> the input file has the following schema:
>>>> 
>>>> userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
>>>> dimensionC_key:chararray, activity:chararray, ...
>>>> 
>>>> Each record represent an activity performed by that userId.
>>>> 
>>>> Based on the value in the activity field, this activity record will be
>>>> mapped to 1 or more categories. There are about 10 categories in total.
>>>> 
>>>> Now I need to count the number of unique users for different dimension
>>>> combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
>>>> category.
>>>> 
>>>> What would be the best practices to perform such calculation?
>>>> 
>>>> I have tried several ways. Although I can get the results I want, it
>>> takes
>>>> a very long time (i.e. days). What I found is most of the time is spent
>>> on
>>>> the map phase. It looks like the script tries to load the huge input
>>> file
>>>> every time it tries to calculate one unique count. Is there a way to
>>>> improve this behavior?
>>>> 
>>>> I also tried something similar to below, but it looks like it reaches
>>> the
>>>> memory cap for a single reducer and just stuck at the last reducer step.
>>>> 
>>>> source = load ... as (userId:chararray, dimensionA_key:chararray,
>>>> dimensionB_key:chararray, dimensionC_key:chararray,
>>>> activity:chararray, ...);
>>>> a = group source by (dimensionA_key, dimensionB_key);
>>>> b = foreach a {
>>>>   userId1 = udf.newUserIdForCategory1(userId, activity);
>>>>   -- this udf returns the original user id if the activity should be
>>>> mapped to Category1 and None otherwise
>>>>   userId2 = udf.newUserIdForCategory2(userId, activity);
>>>>   userId3 = udf.newUserIdForCategory3(userId, activity);
>>>>   ...
>>>>   userId10 = udf.newUserIdForCategory10(userId, activity);
>>>>   generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
>>>> COUNT(userId3), ..., COUNT(userId10);
>>>> }
>>>> store b ...;
>>>> 
>>>> Thanks.
>>>> 
>>>> T.E.
>>> 
>>> 
>> 


Re: Pig Unique Counts on Multiple Subsets of a Large Input

Posted by Thomas Edison <ju...@gmail.com>.
@Alan
I just tried your method as shown below.  The script is stuck at the last
reducer even for a relative small set of the data and less combinations.  I
suspect it's an out of memory issue.  If I remember correctly, to use
nested foreach to calculate the unique counts is not a very good idea.  Any
suggestions?  Thanks.

T.E.

source = load ...;
A = foreach each source generate dimA, dimB, userId,
    udf.getActivity1UserId(userId, activity) as activity1_userId,
    udf.getActivity2UserId(userId, activity) as activity2_userId,
    udf.getActivity3UserId(userId, activity) as activity3_userId,
    ...
    udf.getActivity10UserId(userId, activity) as activity10_userId;

B = group A by (dimA, dimB);

C = foreach B {
    unique_activity1 = distinct A.activity1_userId;
    unique_activity2 = distinct A.activity2_userId;
    unique_activity3 = distinct A.activity3_userId;
    ...
    unique_activity10 = distinct A.activity10_userId;
    generate FLATTEN(group), COUNT(unique_activity1),
COUNT(unique_activity2), COUNT(unique_activity3), ...,
COUNT(unique_activity10);
}

STORE C...;


On Mon, May 6, 2013 at 8:41 AM, Thomas Edison <
justdoit.thomas.edison@gmail.com> wrote:

> Thanks for the reply.
>
> @Jonathan,
> I haven't worked with CUBE before.  I will try to learn it.  Thanks for
> the tip.
> Currently, to split the activity, I use something like this.
>
> new_relation = FILTER relation BY activity == 'abc' or activity == 'def';
>
> In some cases, it is a one to one mapping, but not always.  To my
> understanding, the SPLIT keyword is doing exactly the same as the way I'm
> doing, correct?
>
> @Alan,
> I haven't tried your method.  I didn't come up with the UDF way until I
> saw my old script is taking too much time in the map phase - scanning the
> source multiple times.  I will try your method.  I also attached my old
> code at the end, just in case.
>
> I set my reducer at about 90% of my reducer cap.  I think this is what is
> recommended.
>
> It takes about 10-15 waves.
>
> My old script:
> source = load ...;
>
> activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
> A_1 = foreach activity_1 generate dimA, dimB, userId;
> B_1 = distinct A_1;
> C_1 = group B_1 by (dimA, dimB);
> D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
> STORE...
>
> -- repeat for activity_1, but for other dimension combinations;
>
>
> activity_2 = FILTER source BY activity = 'abc';
> -- repeat whatever activity_1 has been done
>
> -- repeat other activities.
>
> Thanks.
>
> T.E.
>
>
> On Mon, May 6, 2013 at 8:12 AM, Alan Gates <ga...@hortonworks.com> wrote:
>
>> In the script you gave I'd be surprised if it's spending time in the map
>> phase, as the map should be very simple.  It's the reduce phase I'd expect
>> to be very expensive because your mapping UDF prevents Pig from using the
>> algebraic nature of count (that is, it has to ship all of the records to
>> reduce not just the number of records).  If your file is large this will be
>> expensive.  What happens if you switch your script to:
>>
>> A = load ...
>> B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
>> activity) as userId1, ...
>> C = group B by dimA, dimB
>> D = foreach C generate flatten(group), COUNT(userId1), ...
>>
>> When you said it was taking a long time in the map phase were you trying
>> something like the above?  If so I'd check how long your UDF is taking.
>>  Unless you're reading tons of data on a very small cluster the above
>> should be very fast.  It definitely should not reread the input for each
>> UDF.
>>
>> Other things to check:
>> What's your parallel count set at?  That is, how many reducers are you
>> running?
>> How many waves of maps does this create?  That is, what's the number of
>> maps this produces divided by the number of slots you get on your cluster
>> to run it?
>>
>> Alan.
>>
>> On May 5, 2013, at 8:11 PM, Thomas Edison wrote:
>>
>> > Hi there,
>> >
>> > I have a huge input on an HDFS and I would like to use Pig to calculate
>> > several unique metrics. To help explain the problem more easily, I
>> assume
>> > the input file has the following schema:
>> >
>> > userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
>> > dimensionC_key:chararray, activity:chararray, ...
>> >
>> > Each record represent an activity performed by that userId.
>> >
>> > Based on the value in the activity field, this activity record will be
>> > mapped to 1 or more categories. There are about 10 categories in total.
>> >
>> > Now I need to count the number of unique users for different dimension
>> > combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
>> > category.
>> >
>> > What would be the best practices to perform such calculation?
>> >
>> > I have tried several ways. Although I can get the results I want, it
>> takes
>> > a very long time (i.e. days). What I found is most of the time is spent
>> on
>> > the map phase. It looks like the script tries to load the huge input
>> file
>> > every time it tries to calculate one unique count. Is there a way to
>> > improve this behavior?
>> >
>> > I also tried something similar to below, but it looks like it reaches
>> the
>> > memory cap for a single reducer and just stuck at the last reducer step.
>> >
>> > source = load ... as (userId:chararray, dimensionA_key:chararray,
>> > dimensionB_key:chararray, dimensionC_key:chararray,
>> > activity:chararray, ...);
>> > a = group source by (dimensionA_key, dimensionB_key);
>> > b = foreach a {
>> >    userId1 = udf.newUserIdForCategory1(userId, activity);
>> >    -- this udf returns the original user id if the activity should be
>> > mapped to Category1 and None otherwise
>> >    userId2 = udf.newUserIdForCategory2(userId, activity);
>> >    userId3 = udf.newUserIdForCategory3(userId, activity);
>> >    ...
>> >    userId10 = udf.newUserIdForCategory10(userId, activity);
>> >    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
>> > COUNT(userId3), ..., COUNT(userId10);
>> > }
>> > store b ...;
>> >
>> > Thanks.
>> >
>> > T.E.
>>
>>
>

Re: Pig Unique Counts on Multiple Subsets of a Large Input

Posted by Thomas Edison <ju...@gmail.com>.
Thanks for the reply.

@Jonathan,
I haven't worked with CUBE before.  I will try to learn it.  Thanks for the
tip.
Currently, to split the activity, I use something like this.

new_relation = FILTER relation BY activity == 'abc' or activity == 'def';

In some cases, it is a one to one mapping, but not always.  To my
understanding, the SPLIT keyword is doing exactly the same as the way I'm
doing, correct?

@Alan,
I haven't tried your method.  I didn't come up with the UDF way until I saw
my old script is taking too much time in the map phase - scanning the
source multiple times.  I will try your method.  I also attached my old
code at the end, just in case.

I set my reducer at about 90% of my reducer cap.  I think this is what is
recommended.

It takes about 10-15 waves.

My old script:
source = load ...;

activity_1 = FILTER source BY activity = 'abc' OR activity = 'def';
A_1 = foreach activity_1 generate dimA, dimB, userId;
B_1 = distinct A_1;
C_1 = group B_1 by (dimA, dimB);
D_1 = foreach C_1 generate FLATTEN(group), COUNT(C_1);
STORE...

-- repeat for activity_1, but for other dimension combinations;


activity_2 = FILTER source BY activity = 'abc';
-- repeat whatever activity_1 has been done

-- repeat other activities.

Thanks.

T.E.


On Mon, May 6, 2013 at 8:12 AM, Alan Gates <ga...@hortonworks.com> wrote:

> In the script you gave I'd be surprised if it's spending time in the map
> phase, as the map should be very simple.  It's the reduce phase I'd expect
> to be very expensive because your mapping UDF prevents Pig from using the
> algebraic nature of count (that is, it has to ship all of the records to
> reduce not just the number of records).  If your file is large this will be
> expensive.  What happens if you switch your script to:
>
> A = load ...
> B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId,
> activity) as userId1, ...
> C = group B by dimA, dimB
> D = foreach C generate flatten(group), COUNT(userId1), ...
>
> When you said it was taking a long time in the map phase were you trying
> something like the above?  If so I'd check how long your UDF is taking.
>  Unless you're reading tons of data on a very small cluster the above
> should be very fast.  It definitely should not reread the input for each
> UDF.
>
> Other things to check:
> What's your parallel count set at?  That is, how many reducers are you
> running?
> How many waves of maps does this create?  That is, what's the number of
> maps this produces divided by the number of slots you get on your cluster
> to run it?
>
> Alan.
>
> On May 5, 2013, at 8:11 PM, Thomas Edison wrote:
>
> > Hi there,
> >
> > I have a huge input on an HDFS and I would like to use Pig to calculate
> > several unique metrics. To help explain the problem more easily, I assume
> > the input file has the following schema:
> >
> > userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
> > dimensionC_key:chararray, activity:chararray, ...
> >
> > Each record represent an activity performed by that userId.
> >
> > Based on the value in the activity field, this activity record will be
> > mapped to 1 or more categories. There are about 10 categories in total.
> >
> > Now I need to count the number of unique users for different dimension
> > combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
> > category.
> >
> > What would be the best practices to perform such calculation?
> >
> > I have tried several ways. Although I can get the results I want, it
> takes
> > a very long time (i.e. days). What I found is most of the time is spent
> on
> > the map phase. It looks like the script tries to load the huge input file
> > every time it tries to calculate one unique count. Is there a way to
> > improve this behavior?
> >
> > I also tried something similar to below, but it looks like it reaches the
> > memory cap for a single reducer and just stuck at the last reducer step.
> >
> > source = load ... as (userId:chararray, dimensionA_key:chararray,
> > dimensionB_key:chararray, dimensionC_key:chararray,
> > activity:chararray, ...);
> > a = group source by (dimensionA_key, dimensionB_key);
> > b = foreach a {
> >    userId1 = udf.newUserIdForCategory1(userId, activity);
> >    -- this udf returns the original user id if the activity should be
> > mapped to Category1 and None otherwise
> >    userId2 = udf.newUserIdForCategory2(userId, activity);
> >    userId3 = udf.newUserIdForCategory3(userId, activity);
> >    ...
> >    userId10 = udf.newUserIdForCategory10(userId, activity);
> >    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
> > COUNT(userId3), ..., COUNT(userId10);
> > }
> > store b ...;
> >
> > Thanks.
> >
> > T.E.
>
>

Re: Pig Unique Counts on Multiple Subsets of a Large Input

Posted by Alan Gates <ga...@hortonworks.com>.
In the script you gave I'd be surprised if it's spending time in the map phase, as the map should be very simple.  It's the reduce phase I'd expect to be very expensive because your mapping UDF prevents Pig from using the algebraic nature of count (that is, it has to ship all of the records to reduce not just the number of records).  If your file is large this will be expensive.  What happens if you switch your script to:

A = load ...
B = foreach A generate dimA, dimB, udf.newUserIdForCategory1(userId, activity) as userId1, ...
C = group B by dimA, dimB
D = foreach C generate flatten(group), COUNT(userId1), ...

When you said it was taking a long time in the map phase were you trying something like the above?  If so I'd check how long your UDF is taking.  Unless you're reading tons of data on a very small cluster the above should be very fast.  It definitely should not reread the input for each UDF.

Other things to check:
What's your parallel count set at?  That is, how many reducers are you running?
How many waves of maps does this create?  That is, what's the number of maps this produces divided by the number of slots you get on your cluster to run it?

Alan.

On May 5, 2013, at 8:11 PM, Thomas Edison wrote:

> Hi there,
> 
> I have a huge input on an HDFS and I would like to use Pig to calculate
> several unique metrics. To help explain the problem more easily, I assume
> the input file has the following schema:
> 
> userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
> dimensionC_key:chararray, activity:chararray, ...
> 
> Each record represent an activity performed by that userId.
> 
> Based on the value in the activity field, this activity record will be
> mapped to 1 or more categories. There are about 10 categories in total.
> 
> Now I need to count the number of unique users for different dimension
> combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
> category.
> 
> What would be the best practices to perform such calculation?
> 
> I have tried several ways. Although I can get the results I want, it takes
> a very long time (i.e. days). What I found is most of the time is spent on
> the map phase. It looks like the script tries to load the huge input file
> every time it tries to calculate one unique count. Is there a way to
> improve this behavior?
> 
> I also tried something similar to below, but it looks like it reaches the
> memory cap for a single reducer and just stuck at the last reducer step.
> 
> source = load ... as (userId:chararray, dimensionA_key:chararray,
> dimensionB_key:chararray, dimensionC_key:chararray,
> activity:chararray, ...);
> a = group source by (dimensionA_key, dimensionB_key);
> b = foreach a {
>    userId1 = udf.newUserIdForCategory1(userId, activity);
>    -- this udf returns the original user id if the activity should be
> mapped to Category1 and None otherwise
>    userId2 = udf.newUserIdForCategory2(userId, activity);
>    userId3 = udf.newUserIdForCategory3(userId, activity);
>    ...
>    userId10 = udf.newUserIdForCategory10(userId, activity);
>    generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
> COUNT(userId3), ..., COUNT(userId10);
> }
> store b ...;
> 
> Thanks.
> 
> T.E.


Re: Pig Unique Counts on Multiple Subsets of a Large Input

Posted by Jonathan Coveney <jc...@gmail.com>.
Are you familiar with the CUBE keyword that was relatively recently added?
This sounds like a perfect use case for it. Furthermore, how are you
splitting on activity? There is a SPLIT operator which is perfect for this,
as you can have a different relation for each one.

What I would do would be to use split to break it down into activities,
then make a macro that gives you the counts you want and use that on each
of the split relations.


2013/5/6 Thomas Edison <ju...@gmail.com>

> Hi there,
>
> I have a huge input on an HDFS and I would like to use Pig to calculate
> several unique metrics. To help explain the problem more easily, I assume
> the input file has the following schema:
>
> userId:chararray, dimensionA_key:chararray, dimensionB_key:chararray,
> dimensionC_key:chararray, activity:chararray, ...
>
> Each record represent an activity performed by that userId.
>
> Based on the value in the activity field, this activity record will be
> mapped to 1 or more categories. There are about 10 categories in total.
>
> Now I need to count the number of unique users for different dimension
> combinations (i.e. A, B, C, A+B, A+C, B+C, A+B+C) for each activity
> category.
>
> What would be the best practices to perform such calculation?
>
> I have tried several ways. Although I can get the results I want, it takes
> a very long time (i.e. days). What I found is most of the time is spent on
> the map phase. It looks like the script tries to load the huge input file
> every time it tries to calculate one unique count. Is there a way to
> improve this behavior?
>
> I also tried something similar to below, but it looks like it reaches the
> memory cap for a single reducer and just stuck at the last reducer step.
>
> source = load ... as (userId:chararray, dimensionA_key:chararray,
> dimensionB_key:chararray, dimensionC_key:chararray,
> activity:chararray, ...);
> a = group source by (dimensionA_key, dimensionB_key);
> b = foreach a {
>     userId1 = udf.newUserIdForCategory1(userId, activity);
>     -- this udf returns the original user id if the activity should be
> mapped to Category1 and None otherwise
>     userId2 = udf.newUserIdForCategory2(userId, activity);
>     userId3 = udf.newUserIdForCategory3(userId, activity);
>     ...
>     userId10 = udf.newUserIdForCategory10(userId, activity);
>     generate FLATTEN(group), COUNT(userId1), COUNT(userId2),
> COUNT(userId3), ..., COUNT(userId10);
> }
> store b ...;
>
> Thanks.
>
> T.E.
>