You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by "Cheung, Po" <po...@ebay.com> on 2011/09/16 09:35:30 UTC

Filtering records by key and date

I am trying to filter a set of records by key and last modified date so that only one record is returned per key with the most recent date.  I have a working script below but wonder if there is a simpler and more elegant way to do this.

Input:
KEY     VALUE   DATE
A       10      2011-01-01 23:59:00
A       11      2011-01-01 23:59:59
A       12      2011-01-01 23:00:59
B       20      2011-02-01 01:00:00
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

Output:
A       11      2011-01-01 23:59:59
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

REGISTER piggybank.jar;
REGISTER joda-time.jar;

DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray);

-- Convert date string to Unix time as a new column
data = FOREACH raw GENERATE key, value, date, (long)ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd HH:mm:ss')) AS time:long;

grouped = GROUP data BY key;

-- Create a relation with key and max time only
latest = FOREACH grouped GENERATE flatten(group) AS key:chararray, (long) MAX(data.time) AS time:long;

-- Join 'data' with 'latest' on both key and time columns
joined = JOIN data BY (key, time), latest BY (key, time);

-- Project the original columns from the join
result = FOREACH joined GENERATE data::key, data::value, data::date;

DUMP result;

(A,11,2011-01-01 23:59:59)
(B,21,2011-02-02 01:00:00)
(C,30,2011-03-01 03:00:00)

RE: Filtering records by key and date

Posted by "Cheung, Po" <po...@ebay.com>.
Thanks Marek!  Using ORDER BY instead of MAX, I can simply order by date without having to convert it to epoch at all.

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray);

rawGroup = GROUP raw BY key;

data = FOREACH rawGroup {
        rawOrdered = ORDER raw BY date DESC;
        rawLimited = LIMIT rawOrdered 1;
        GENERATE
                FLATTEN(rawLimited)
        ;
};

This is really cool.  Thanks a bunch!

Po

-----Original Message-----
From: Marek Miglinski [mailto:mmiglinski@seven.com] 
Sent: Friday, September 16, 2011 1:35 AM
To: user@pig.apache.org
Subject: RE: Filtering records by key and date

Sorry, forgot that epoch should be in GROUP before FOREACH... Used my own UDF, u can use yours... Code:

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray); 

rawWithEpoch = FOREACH raw GENERATE key, value, date, DateToEpoch(date, 'yyyy-MM-dd HH:mm:ss') as epoch;

rawGroup = GROUP rawWithEpoch by key;

data = FOREACH rawGroup {
	rawOrdered = ORDER rawWithEpoch BY epoch DESC;
	rawLimited = LIMIT rawOrdered 1;
	GENERATE
		FLATTEN(rawLimited)
	;
};


Sincerely,
Marek M.

-----Original Message-----
From: Marek Miglinski [mailto:mmiglinski@seven.com] 
Sent: Friday, September 16, 2011 11:17 AM
To: user@pig.apache.org
Subject: RE: Filtering records by key and date

You don't need JOIN at all.. Use this instead:

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray); rawGroup = GROUP raw BY key; data = FOREACH rawGroup {
	epoch = (long) ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd HH:mm:ss'));
	rawOrdered = ORDER raw BY epoch DESC;
	rawLimited = LIMIT rawOrdered 1;
	GENERATE
		FLATTEN(rawLimited)
	;
};
--STORE data;

And I recommend you to implement you own UDF to convert your date straight to epoch (UNIX time).



Sincerely,
Marek M.

-----Original Message-----
From: Cheung, Po [mailto:pocheung@ebay.com]
Sent: Friday, September 16, 2011 10:36 AM
To: user@pig.apache.org
Subject: Filtering records by key and date

I am trying to filter a set of records by key and last modified date so that only one record is returned per key with the most recent date.  I have a working script below but wonder if there is a simpler and more elegant way to do this.

Input:
KEY     VALUE   DATE
A       10      2011-01-01 23:59:00
A       11      2011-01-01 23:59:59
A       12      2011-01-01 23:00:59
B       20      2011-02-01 01:00:00
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

Output:
A       11      2011-01-01 23:59:59
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

REGISTER piggybank.jar;
REGISTER joda-time.jar;

DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray);

-- Convert date string to Unix time as a new column data = FOREACH raw GENERATE key, value, date, (long)ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd HH:mm:ss')) AS time:long;

grouped = GROUP data BY key;

-- Create a relation with key and max time only latest = FOREACH grouped GENERATE flatten(group) AS key:chararray, (long) MAX(data.time) AS time:long;

-- Join 'data' with 'latest' on both key and time columns joined = JOIN data BY (key, time), latest BY (key, time);

-- Project the original columns from the join result = FOREACH joined GENERATE data::key, data::value, data::date;

DUMP result;

(A,11,2011-01-01 23:59:59)
(B,21,2011-02-02 01:00:00)
(C,30,2011-03-01 03:00:00)

RE: Filtering records by key and date

Posted by Marek Miglinski <mm...@seven.com>.
Sorry, forgot that epoch should be in GROUP before FOREACH... Used my own UDF, u can use yours... Code:

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray); 

rawWithEpoch = FOREACH raw GENERATE key, value, date, DateToEpoch(date, 'yyyy-MM-dd HH:mm:ss') as epoch;

rawGroup = GROUP rawWithEpoch by key;

data = FOREACH rawGroup {
	rawOrdered = ORDER rawWithEpoch BY epoch DESC;
	rawLimited = LIMIT rawOrdered 1;
	GENERATE
		FLATTEN(rawLimited)
	;
};


Sincerely,
Marek M.

-----Original Message-----
From: Marek Miglinski [mailto:mmiglinski@seven.com] 
Sent: Friday, September 16, 2011 11:17 AM
To: user@pig.apache.org
Subject: RE: Filtering records by key and date

You don't need JOIN at all.. Use this instead:

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray); rawGroup = GROUP raw BY key; data = FOREACH rawGroup {
	epoch = (long) ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd HH:mm:ss'));
	rawOrdered = ORDER raw BY epoch DESC;
	rawLimited = LIMIT rawOrdered 1;
	GENERATE
		FLATTEN(rawLimited)
	;
};
--STORE data;

And I recommend you to implement you own UDF to convert your date straight to epoch (UNIX time).



Sincerely,
Marek M.

-----Original Message-----
From: Cheung, Po [mailto:pocheung@ebay.com]
Sent: Friday, September 16, 2011 10:36 AM
To: user@pig.apache.org
Subject: Filtering records by key and date

I am trying to filter a set of records by key and last modified date so that only one record is returned per key with the most recent date.  I have a working script below but wonder if there is a simpler and more elegant way to do this.

Input:
KEY     VALUE   DATE
A       10      2011-01-01 23:59:00
A       11      2011-01-01 23:59:59
A       12      2011-01-01 23:00:59
B       20      2011-02-01 01:00:00
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

Output:
A       11      2011-01-01 23:59:59
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

REGISTER piggybank.jar;
REGISTER joda-time.jar;

DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray);

-- Convert date string to Unix time as a new column data = FOREACH raw GENERATE key, value, date, (long)ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd HH:mm:ss')) AS time:long;

grouped = GROUP data BY key;

-- Create a relation with key and max time only latest = FOREACH grouped GENERATE flatten(group) AS key:chararray, (long) MAX(data.time) AS time:long;

-- Join 'data' with 'latest' on both key and time columns joined = JOIN data BY (key, time), latest BY (key, time);

-- Project the original columns from the join result = FOREACH joined GENERATE data::key, data::value, data::date;

DUMP result;

(A,11,2011-01-01 23:59:59)
(B,21,2011-02-02 01:00:00)
(C,30,2011-03-01 03:00:00)

RE: Filtering records by key and date

Posted by Marek Miglinski <mm...@seven.com>.
You don't need JOIN at all.. Use this instead:

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray);
rawGroup = GROUP raw BY key;
data = FOREACH rawGroup {
	epoch = (long) ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd HH:mm:ss'));
	rawOrdered = ORDER raw BY epoch DESC;
	rawLimited = LIMIT rawOrdered 1;
	GENERATE
		FLATTEN(rawLimited)
	;
};
--STORE data;

And I recommend you to implement you own UDF to convert your date straight to epoch (UNIX time).



Sincerely,
Marek M.

-----Original Message-----
From: Cheung, Po [mailto:pocheung@ebay.com] 
Sent: Friday, September 16, 2011 10:36 AM
To: user@pig.apache.org
Subject: Filtering records by key and date

I am trying to filter a set of records by key and last modified date so that only one record is returned per key with the most recent date.  I have a working script below but wonder if there is a simpler and more elegant way to do this.

Input:
KEY     VALUE   DATE
A       10      2011-01-01 23:59:00
A       11      2011-01-01 23:59:59
A       12      2011-01-01 23:00:59
B       20      2011-02-01 01:00:00
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

Output:
A       11      2011-01-01 23:59:59
B       21      2011-02-02 01:00:00
C       30      2011-03-01 03:00:00

REGISTER piggybank.jar;
REGISTER joda-time.jar;

DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();

raw = LOAD 'data.txt' USING PigStorage AS (key:chararray, value:int, date:chararray);

-- Convert date string to Unix time as a new column data = FOREACH raw GENERATE key, value, date, (long)ISOToUnix(CustomFormatToISO(date, 'yyyy-MM-dd HH:mm:ss')) AS time:long;

grouped = GROUP data BY key;

-- Create a relation with key and max time only latest = FOREACH grouped GENERATE flatten(group) AS key:chararray, (long) MAX(data.time) AS time:long;

-- Join 'data' with 'latest' on both key and time columns joined = JOIN data BY (key, time), latest BY (key, time);

-- Project the original columns from the join result = FOREACH joined GENERATE data::key, data::value, data::date;

DUMP result;

(A,11,2011-01-01 23:59:59)
(B,21,2011-02-02 01:00:00)
(C,30,2011-03-01 03:00:00)