You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Rodrigo Ferreira <we...@gmail.com> on 2014/07/12 20:24:05 UTC

What really happens after a group?

Hi everyone,

I have a doubt.

Well, as far as I understood from the book "Programming Pig", after GROUP
all the records with the same key go to the same reduce. Well, so far so
good.

This allows us to write a statement like this:

*foreach grpd generate group, COUNT(input)*

which should count the elements *per* key.

Then comes my issue. I have a script like this:

B = GROUP A BY key PARALLEL p;
C = FILTER B BY NOT IsEmpty(A);
D = FOREACH C GENERATE FLATTEN(MyFunction(A)) AS (mySchema);

If I go through all the tuples in the bag handed to *MyFunction*, I see
elements with different keys (although they are sorted)! Am I doing
something wrong? What am I missing here?

So far, I'm managing this by checking when the key changes and then
computing my stuff in a per key basis. But I'm not sure if this is OK or if
it's a kind of  a hack.

Thank you!

Rodrigo Ferreira.

Re: What really happens after a group?

Posted by Rodrigo Ferreira <we...@gmail.com>.
Hi Suraj, I can't run this test today, but I've managed to at least take a
look at my code.

You are right, my wrong impression came from the fact that I'm keeping some
class state across bags and because of that I got confused about what was
really happening. I thank you for your help, and I'll be more careful next
time.

Thanks a lot.

Rodrigo.


2014-07-13 16:30 GMT+02:00 Suraj Nayak M <sn...@gmail.com>:

>  Rodrigo,
>
> I tried following to reproduce the problem. In my UDF *GroupTest*, I was
> getting data related to 1 key at a time(see *LOG *below). Added UDF Code,
> Pig Code, Input data and Log output below. Let me know in case I have
> missed anything.
>
> *UDF Code :* (Intentionally added sysout in UDF to check bag size in the
> log)
> package com.pigtutorial.ch01;
>
> import java.io.IOException;
> import java.util.Iterator;
>
> import org.apache.pig.EvalFunc;
> import org.apache.pig.data.BagFactory;
> import org.apache.pig.data.DataBag;
> import org.apache.pig.data.DataType;
> import org.apache.pig.data.Tuple;
>
> public class GroupTest extends EvalFunc<DataBag> {
>
>     @Override
>     public DataBag exec(Tuple input) throws IOException {
>
>         DataBag returnBag = BagFactory.getInstance().newDefaultBag();
>         if (input == null || input.size() == 0 || input.get(0) == null)
>             return null;
>         try {
>             DataBag bag = DataType.toBag(input.get(0));
>           *  System.out.println("Calling UDF with databag size : " +
> bag.size());*
>
>             Iterator it = bag.iterator();
>             while (it.hasNext()) {
>                 Tuple t = (Tuple) it.next();
>                * System.out.println(t);*
>                 returnBag.add(t);
>             }
>         } catch (Exception e) {
>             throw new IOException("Caught exception processing input row
> ", e);
>         }
>         return returnBag;
>     }
>
> }
>
> *Pig Code :*
> a =
>     LOAD 'src/resources/data/input/data_input'
>     USING PigStorage(',')
>     AS (KEY:chararray, VAL1:chararray, VAL2:chararray);
>
> b = GROUP a
>     BY KEY
>     PARALLEL 2;
>
> c = FILTER b
>     BY NOT IsEmpty(a);
>
> d = FOREACH c
>     GENERATE FLATTEN(com.pigtutorial.ch01.GroupTest(a));
> STORE
>     a
>     INTO 'src/resources/data/actual_output/group_output'
>     USING PigStorage(',');
>
> *INPUT :*
> key1,d1,d2
> key2,d1,d3
> key1,d1,d4
> key1,d1,d5
>
> *LOG :*
> Calling UDF with databag size : 3
> (key1,d1,d2)
> (key1,d1,d4)
> (key1,d1,d5)
> Calling UDF with databag size : 1
> (key2,d1,d3)
>
> --
> Thanks
> Suraj Nayak
>
>
> On Sunday 13 July 2014 04:36 AM, Rodrigo Ferreira wrote:
>
> (answering again but now including the mailing list :P)
>
> Thank you for your answer Suraj.
>
>  What you said is exactly what I expect, but I get something different.
>
>  Using your example (the specific data is not important here) I get in my
> UDF more than one key ordered. Here's a sample of the code of my UDF:
>
>
> DataBag bag = DataType.toBag(input.get(0));
>
>  Iterator it = bag.iterator();
>
>  while (it.hasNext()) {
>  Tuple t = (Tuple)it.next();
> //Here I print the attribute used as the grouping key
>  }
>
>
>  What I get in the output is:
>
>  key1
>  key1
>  key1
> key2
>
>  The point is that I'm using test data that are not really big (less than
> 64MB). Anyhow, Pig shouldn't put these keys together in the same bag! Maybe
> this a kind of optimization that I should turn off.
>
>
> 2014-07-12 23:29 GMT+02:00 Suraj Nayak <sn...@gmail.com>:
>
>> Are you processing the bag in the UDF?
>>
>> Can you send sample records which is going in to UDF using dump command
>> for alias C?
>>
>> If the data is(alias A)
>> (key1,d1,d2)
>> (key2,d1,d3)
>> (key1,d1,d4)
>> (key1,d1,d5)
>>
>> On grouping on 1st column the data should be grouped as below
>>
>> {(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }}
>> {(key2),{ (key2,d1,d3) }}
>>
>> If you are providing the data A to UDF you should get all records with
>> respect to same key in same bag.
>>
>> --
>> Suraj Nayak
>>
>
>
>

Re: What really happens after a group?

Posted by Suraj Nayak M <sn...@gmail.com>.
Rodrigo,

I tried following to reproduce the problem. In my UDF /*GroupTest*/, I 
was getting data related to 1 key at a time(see *LOG *below). Added UDF 
Code, Pig Code, Input data and Log output below. Let me know in case I 
have missed anything.

*UDF Code :* (Intentionally added sysout in UDF to check bag size in the 
log)
package com.pigtutorial.ch01;

import java.io.IOException;
import java.util.Iterator;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;

public class GroupTest extends EvalFunc<DataBag> {

     @Override
     public DataBag exec(Tuple input) throws IOException {

         DataBag returnBag = BagFactory.getInstance().newDefaultBag();
         if (input == null || input.size() == 0 || input.get(0) == null)
             return null;
         try {
             DataBag bag = DataType.toBag(input.get(0));
/  System.out.println("Calling UDF with databag size : " + bag.size());/
             Iterator it = bag.iterator();
             while (it.hasNext()) {
                 Tuple t = (Tuple) it.next();
/System.out.println(t);/
                 returnBag.add(t);
             }
         } catch (Exception e) {
             throw new IOException("Caught exception processing input 
row ", e);
         }
         return returnBag;
     }

}

*Pig Code :*
a =
     LOAD 'src/resources/data/input/data_input'
     USING PigStorage(',')
     AS (KEY:chararray, VAL1:chararray, VAL2:chararray);

b = GROUP a
     BY KEY
     PARALLEL 2;

c = FILTER b
     BY NOT IsEmpty(a);

d = FOREACH c
     GENERATE FLATTEN(com.pigtutorial.ch01.GroupTest(a));
STORE
     a
     INTO 'src/resources/data/actual_output/group_output'
     USING PigStorage(',');

*INPUT :*
key1,d1,d2
key2,d1,d3
key1,d1,d4
key1,d1,d5

*LOG :*
Calling UDF with databag size : 3
(key1,d1,d2)
(key1,d1,d4)
(key1,d1,d5)
Calling UDF with databag size : 1
(key2,d1,d3)

--
Thanks
Suraj Nayak

On Sunday 13 July 2014 04:36 AM, Rodrigo Ferreira wrote:
> (answering again but now including the mailing list :P)
>
> Thank you for your answer Suraj.
>
> What you said is exactly what I expect, but I get something different.
>
> Using your example (the specific data is not important here) I get in 
> my UDF more than one key ordered. Here's a sample of the code of my UDF:
>
>
> DataBag bag = DataType.toBag(input.get(0));
>
> Iterator it = bag.iterator();
>
> while (it.hasNext()) {
> Tuple t = (Tuple)it.next();
> //Here I print the attribute used as the grouping key
> }
>
>
> What I get in the output is:
>
> key1
> key1
> key1
> key2
>
> The point is that I'm using test data that are not really big (less 
> than 64MB). Anyhow, Pig shouldn't put these keys together in the same 
> bag! Maybe this a kind of optimization that I should turn off.
>
>
> 2014-07-12 23:29 GMT+02:00 Suraj Nayak <snayakm@gmail.com 
> <ma...@gmail.com>>:
>
>     Are you processing the bag in the UDF?
>
>     Can you send sample records which is going in to UDF using dump
>     command for alias C?
>
>     If the data is(alias A)
>     (key1,d1,d2)
>     (key2,d1,d3)
>     (key1,d1,d4)
>     (key1,d1,d5)
>
>     On grouping on 1st column the data should be grouped as below
>
>     {(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }}
>     {(key2),{ (key2,d1,d3) }}
>
>     If you are providing the data A to UDF you should get all records
>     with respect to same key in same bag.
>
>     --
>     Suraj Nayak
>
>


Re: What really happens after a group?

Posted by Rodrigo Ferreira <we...@gmail.com>.
(answering again but now including the mailing list :P)

Thank you for your answer Suraj.

What you said is exactly what I expect, but I get something different.

Using your example (the specific data is not important here) I get in my
UDF more than one key ordered. Here's a sample of the code of my UDF:


DataBag bag = DataType.toBag(input.get(0));

Iterator it = bag.iterator();

while (it.hasNext()) {
 Tuple t = (Tuple)it.next();
//Here I print the attribute used as the grouping key
}


What I get in the output is:

key1
key1
key1
key2

The point is that I'm using test data that are not really big (less than
64MB). Anyhow, Pig shouldn't put these keys together in the same bag! Maybe
this a kind of optimization that I should turn off.


2014-07-12 23:29 GMT+02:00 Suraj Nayak <sn...@gmail.com>:

> Are you processing the bag in the UDF?
>
> Can you send sample records which is going in to UDF using dump command
> for alias C?
>
> If the data is(alias A)
> (key1,d1,d2)
> (key2,d1,d3)
> (key1,d1,d4)
> (key1,d1,d5)
>
> On grouping on 1st column the data should be grouped as below
>
> {(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }}
> {(key2),{ (key2,d1,d3) }}
>
> If you are providing the data A to UDF you should get all records with
> respect to same key in same bag.
>
> --
> Suraj Nayak
>

Re: What really happens after a group?

Posted by Suraj Nayak <sn...@gmail.com>.
Are you processing the bag in the UDF?

Can you send sample records which is going in to UDF using dump command for
alias C?

If the data is(alias A)
(key1,d1,d2)
(key2,d1,d3)
(key1,d1,d4)
(key1,d1,d5)

On grouping on 1st column the data should be grouped as below

{(key1),{ (key1,d1,d2), (key1,d1,d4), (key1,d1,d5) }}
{(key2),{ (key2,d1,d3) }}

If you are providing the data A to UDF you should get all records with
respect to same key in same bag.

--
Suraj Nayak