You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Vishal Santoshi <vi...@gmail.com> on 2010/05/24 18:41:41 UTC

Creating a SequentialFileLoader

I have this working , so  seeking validation and corrections.
We have SequentialFiles with various CustomWritables in hadoop and we want
to able to work with them from within pig

I have taken PigStorage and the piggybank SequentialFileLoader as a template
 and added pluggable converters that are fed through
the SequentialFileLoader ( which has a default ).
The below is part of the java file.

public class SequenceFileLoader extends FileInputLoadFunc
implementsLoadPushDown{

        public SequenceFileLoader() {

converter = new TextConverter();

 }

  @SuppressWarnings("unchecked")

public SequenceFileLoader(String customWritableToTupleBaseCoverter)
throwsFrontendException{

 try {

converter =
(CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();

} catch (Exception e) {

throw new FrontendException(e);

}

}

        @SuppressWarnings("unchecked")

@Override

public Tuple getNext() throws IOException {

if (!mRequiredColumnsInitialized) {

if (signature!=null) {

Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());

mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(
signature));

}

mRequiredColumnsInitialized = true;

}

boolean next = false;

try {

next = reader.nextKeyValue();

} catch (InterruptedException e) {

throw new IOException(e);

}


 if (!next) return null;


 key = reader.getCurrentKey();

value = reader.getCurrentValue();

converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);

Tuple t =  mTupleFactory.newTuple(mProtoTuple);

mProtoTuple.clear();

return t;

}



and

public abstract class CustomWritableToTupleBaseConverter<K extends Writable,
V extends Writable>{


   public abstract void populateTupleList(K time, V value, boolean[]
mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;


}



Features * Allows for a Default Format (TextConverter) ** Text, NullWritable
*** Text is treated as a COMMA(",") separated Text Array **** Consider a
Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD 'input'
USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 **** grunt> 3
* Allows for custom formats (example TimeWritableTestLongConverter) ** It is
upto the Custom Converter to provide the SequenceFileLoader with the
Writables *** public abstract void populateTupleList(K time, V value,
boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
IOException; in the base class CustomWritableToTupleBaseConverter. *** The
Custom Converter has to convert it's Key/Value ( as specified by the
SequenceFile ) into a List of Pig recognizable DataTypes **** grunt> DEFINE
SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); ****
grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray, f7:double);
**** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told as
to what is the type of the column , for it to do the right conversion. In
the above example is f7 is not defined as double, it will try to cast it
into an int , as we adding a 1 to the value. ** Note that the custom
converter is an argument defined in the DEFINE call. * Allows for limiting
the number of columns in the input ** grunt> A = LOAD 'input' USING
SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray,
f5:chararray, f6:chararray, f7:double);


Any issues any one sees in this approach?

I have chosen the path of least resistance .. so any guidance will be
appreciated.

Re: Creating a SequentialFileLoader

Posted by Vishal Santoshi <vi...@gmail.com>.
Thank u Dmitry.
I will do the needful. Thanks again for the valuable insight.

On Mon, May 24, 2010 at 5:07 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> Vishal,
> The way we handle all development is through creating tickets on the Pig
> jira and attaching patches that address those tickets; one of the
> committers
> then reviews the patches and provides feedback or commits the patch to Pig.
>
> More info here: http://wiki.apache.org/pig/HowToContribute
>
> -D
>
> On Mon, May 24, 2010 at 1:50 PM, Vishal Santoshi
> <vi...@gmail.com>wrote:
>
> > I will spruce it up and there are a few changes to make the abstraction
> > better as in limiting the columns for performance is done in the concrete
> > impls .. hardly a good option for reusability.
> >
> > I am sure though that I do not have submit rights to Pig github.
> >
> > On Mon, May 24, 2010 at 4:21 PM, Dmitriy Ryaboy <dv...@gmail.com>
> > wrote:
> >
> > > Vishal,
> > > Now I get it. Looks really good actually. It would be great if you
> > polished
> > > this up and submitted to piggybank.
> > >
> > > -D
> > >
> > > On Mon, May 24, 2010 at 1:14 PM, Vishal Santoshi
> > > <vi...@gmail.com>wrote:
> > >
> > > > Using the SequentialFileInputFormat ( or an extension of it )
> > > >
> > > > @Override
> > > >
> > > > public InputFormat getInputFormat() throws IOException {
> > > >
> > > > return new SequenceFileInputFormat<Writable, Writable>();
> > > >
> > > > }
> > > >
> > > >
> > > > and
> > > >
> > > >
> > > > @Override
> > > >
> > > > public void setLocation(String location, Job job) throws IOException
> {
> > > >
> > > > FileInputFormat.setInputPaths(job, location);
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > > is all that one would need I think. This is pig0.7.0 though.
> > > > I have only tried things like a/*/xyz*   , type of patterns and they
> > have
> > > > worked for me.
> > > >
> > > > On Mon, May 24, 2010 at 4:05 PM, Edward Capriolo <
> > edlinuxguru@gmail.com
> > > > >wrote:
> > > >
> > > > > Sounds great. I do not need all the things you do but I do need a
> > > > sequence
> > > > > file loaded that will take globs or directories. The current loader
> > > only
> > > > > lets you load a since file.
> > > > >
> > > > > On Mon, May 24, 2010 at 4:03 PM, Vishal Santoshi
> > > > > <vi...@gmail.com>wrote:
> > > > >
> > > > > > All said and done , does this smell a hack.. or is it acceptable
> ,
> > > for
> > > > my
> > > > > > use case , where I only am interested , in making my Sequential
> > Files
> > > > and
> > > > > > it's contents use Pig , to it's fullest?
> > > > > >
> > > > > > On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi
> > > > > > <vi...@gmail.com>wrote:
> > > > > >
> > > > > > > Sorry Dmitry.
> > > > > > >
> > > > > > > Let me explain our issue more lucidly. We have most of our MR
> > jobs
> > > > use
> > > > > > raw
> > > > > > > hadoop ( java impl ) and create SequentialFiles with varying
> > Custom
> > > > > > > Writables.
> > > > > > > PigStorage is limited to TextFormat and there is an
> > implementation
> > > in
> > > > > > > piggybank for SequentialFile Loading, which it seems is limited
> ,
> > > in
> > > > > the
> > > > > > > sense that
> > > > > > > it
> > > > > > >
> > > > > > > * does not provide for Custom Formats ( like a TextPair or a
> > Score
> > > > that
> > > > > > may
> > > > > > > use basic Writables like Text,DoubleWritable etc )
> > > > > > > * does not provide for type/name mapping ( the "AS" clause )
> > > > > > > * does not provide for limiting the  inputs u may be interested
> > in.
> > > > > > >
> > > > > > > I want to use a Loader to provide for something like this
> > > > > > >
> > > > > > > LOAD 'input' USING SequenceFileLoader AS
> > (f1:chararray,2:chararray,
> > > > > > > f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double);
> > > > > > >
> > > > > > > Now this is well and good and easy to write , if we have some
> > > > standard
> > > > > > > (Text,NullWritable ), Sequential File , with the Text having
> ","
> > > > > > separated
> > > > > > > columns ( almost a Pig Storage , but feeding off a Sequential
> > File
> > > ).
> > > > > > >
> > > > > > >
> > > > > > > In cases though , where we have a  Sequential File (
> > > > > > > CustomWritableKey, CustomWritableValue ) where we still would
> > like
> > > to
> > > > > > > extract the raw types and aggregate on, the above fails , as
> the
> > > > > > chararray,
> > > > > > > int etc are limited to known types ( and I may be wrong here ).
> > > > > > >
> > > > > > > What there fore I tried was to reduce the  CustomWritables to
> > their
> > > > raw
> > > > > > > types , using a injectable Converter. This converter, takes the
> > > > > > > CustomWritable ( key and value of a SequentialFile )  and
> returns
> > > the
> > > > > > >  ArrayList<Object> that are the CustomWritable's reduced to
> their
> > > > base
> > > > > > types
> > > > > > > and use the List returned to create the Tuple that has to be
> > > returned
> > > > > > from
> > > > > > > getNext().
> > > > > > >
> > > > > > > I think this code is more likely to tell the tale better.
> > > > > > >
> > > > > > > http://pastebin.com/QEwMztjU
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <
> > > dvryaboy@gmail.com
> > > > > > >wrote:
> > > > > > >
> > > > > > >> Vishal,
> > > > > > >> I am not sure what your question is. Could you describe your
> > goals
> > > > and
> > > > > > >> challenges before pasting in the implementation? It looks like
> > the
> > > > > > bottom
> > > > > > >> part of your email, with all the comments, got malformatted,
> > which
> > > > may
> > > > > > be
> > > > > > >> the source of my confusion.
> > > > > > >>
> > > > > > >> Also, various services like pastebin and gist work better for
> > code
> > > > > > >> sharing,
> > > > > > >> as they can take care of highlighting and things of that
> nature,
> > > > which
> > > > > > is
> > > > > > >> handy for reviews.
> > > > > > >>
> > > > > > >> Thanks
> > > > > > >> -Dmitriy
> > > > > > >>
> > > > > > >> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> > > > > > >> <vi...@gmail.com>wrote:
> > > > > > >>
> > > > > > >> > I have this working , so  seeking validation and
> corrections.
> > > > > > >> > We have SequentialFiles with various CustomWritables in
> hadoop
> > > and
> > > > > we
> > > > > > >> want
> > > > > > >> > to able to work with them from within pig
> > > > > > >> >
> > > > > > >> > I have taken PigStorage and the piggybank
> SequentialFileLoader
> > > as
> > > > a
> > > > > > >> > template
> > > > > > >> >  and added pluggable converters that are fed through
> > > > > > >> > the SequentialFileLoader ( which has a default ).
> > > > > > >> > The below is part of the java file.
> > > > > > >> >
> > > > > > >> > public class SequenceFileLoader extends FileInputLoadFunc
> > > > > > >> > implementsLoadPushDown{
> > > > > > >> >
> > > > > > >> >        public SequenceFileLoader() {
> > > > > > >> >
> > > > > > >> > converter = new TextConverter();
> > > > > > >> >
> > > > > > >> >  }
> > > > > > >> >
> > > > > > >> >  @SuppressWarnings("unchecked")
> > > > > > >> >
> > > > > > >> > public SequenceFileLoader(String
> > > > customWritableToTupleBaseCoverter)
> > > > > > >> > throwsFrontendException{
> > > > > > >> >
> > > > > > >> >  try {
> > > > > > >> >
> > > > > > >> > converter =
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> > > > > > >> >
> > > > > > >> > } catch (Exception e) {
> > > > > > >> >
> > > > > > >> > throw new FrontendException(e);
> > > > > > >> >
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> >        @SuppressWarnings("unchecked")
> > > > > > >> >
> > > > > > >> > @Override
> > > > > > >> >
> > > > > > >> > public Tuple getNext() throws IOException {
> > > > > > >> >
> > > > > > >> > if (!mRequiredColumnsInitialized) {
> > > > > > >> >
> > > > > > >> > if (signature!=null) {
> > > > > > >> >
> > > > > > >> > Properties p =
> > > > > > >> >
> UDFContext.getUDFContext().getUDFProperties(this.getClass());
> > > > > > >> >
> > > > > > >> > mRequiredColumns =
> > > > > > >> (boolean[])ObjectSerializer.deserialize(p.getProperty(
> > > > > > >> > signature));
> > > > > > >> >
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> > mRequiredColumnsInitialized = true;
> > > > > > >> >
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> > boolean next = false;
> > > > > > >> >
> > > > > > >> > try {
> > > > > > >> >
> > > > > > >> > next = reader.nextKeyValue();
> > > > > > >> >
> > > > > > >> > } catch (InterruptedException e) {
> > > > > > >> >
> > > > > > >> > throw new IOException(e);
> > > > > > >> >
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >  if (!next) return null;
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >  key = reader.getCurrentKey();
> > > > > > >> >
> > > > > > >> > value = reader.getCurrentValue();
> > > > > > >> >
> > > > > > >> > converter.populateTupleList(key,
> > > > > value,mRequiredColumns,mProtoTuple);
> > > > > > >> >
> > > > > > >> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> > > > > > >> >
> > > > > > >> > mProtoTuple.clear();
> > > > > > >> >
> > > > > > >> > return t;
> > > > > > >> >
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > and
> > > > > > >> >
> > > > > > >> > public abstract class CustomWritableToTupleBaseConverter<K
> > > extends
> > > > > > >> > Writable,
> > > > > > >> > V extends Writable>{
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >   public abstract void populateTupleList(K time, V value,
> > > > boolean[]
> > > > > > >> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> > > > IOException;
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > }
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Features * Allows for a Default Format (TextConverter) **
> > Text,
> > > > > > >> > NullWritable
> > > > > > >> > *** Text is treated as a COMMA(",") separated Text Array
> ****
> > > > > Consider
> > > > > > a
> > > > > > >> > Text with values as 1 , 2 , 3 **** grunt> DEFINE
> > > > SequenceFileLoader
> > > > > > >> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A
> =
> > > LOAD
> > > > > > >> 'input'
> > > > > > >> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE
> $3
> > > > ****
> > > > > > >> grunt>
> > > > > > >> > 3
> > > > > > >> > * Allows for custom formats (example
> > > > TimeWritableTestLongConverter)
> > > > > **
> > > > > > >> It
> > > > > > >> > is
> > > > > > >> > upto the Custom Converter to provide the SequenceFileLoader
> > with
> > > > the
> > > > > > >> > Writables *** public abstract void populateTupleList(K time,
> V
> > > > > value,
> > > > > > >> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple)
> > > throws
> > > > > > >> > IOException; in the base class
> > > CustomWritableToTupleBaseConverter.
> > > > > ***
> > > > > > >> The
> > > > > > >> > Custom Converter has to convert it's Key/Value ( as
> specified
> > by
> > > > the
> > > > > > >> > SequenceFile ) into a List of Pig recognizable DataTypes
> ****
> > > > grunt>
> > > > > > >> DEFINE
> > > > > > >> > SequenceFileLoader
> > > > a.b.c.SequenceFileLoader('a.b.b.SomeConverter');
> > > > > > ****
> > > > > > >> > grunt> A = LOAD 'input' USING SequenceFileLoader AS
> > > (f1:chararray,
> > > > > > >> > f2:chararray, f3:long, f4:chararray, f5:chararray,
> > f6:chararray,
> > > > > > >> > f7:double);
> > > > > > >> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig
> has
> > > to
> > > > be
> > > > > > >> told
> > > > > > >> > as
> > > > > > >> > to what is the type of the column , for it to do the right
> > > > > conversion.
> > > > > > >> In
> > > > > > >> > the above example is f7 is not defined as double, it will
> try
> > to
> > > > > cast
> > > > > > it
> > > > > > >> > into an int , as we adding a 1 to the value. ** Note that
> the
> > > > custom
> > > > > > >> > converter is an argument defined in the DEFINE call. *
> Allows
> > > for
> > > > > > >> limiting
> > > > > > >> > the number of columns in the input ** grunt> A = LOAD
> 'input'
> > > > USING
> > > > > > >> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long,
> > > > > > >> f4:chararray,
> > > > > > >> > f5:chararray, f6:chararray, f7:double);
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Any issues any one sees in this approach?
> > > > > > >> >
> > > > > > >> > I have chosen the path of least resistance .. so any
> guidance
> > > will
> > > > > be
> > > > > > >> > appreciated.
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Creating a SequentialFileLoader

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Vishal,
The way we handle all development is through creating tickets on the Pig
jira and attaching patches that address those tickets; one of the committers
then reviews the patches and provides feedback or commits the patch to Pig.

More info here: http://wiki.apache.org/pig/HowToContribute

-D

On Mon, May 24, 2010 at 1:50 PM, Vishal Santoshi
<vi...@gmail.com>wrote:

> I will spruce it up and there are a few changes to make the abstraction
> better as in limiting the columns for performance is done in the concrete
> impls .. hardly a good option for reusability.
>
> I am sure though that I do not have submit rights to Pig github.
>
> On Mon, May 24, 2010 at 4:21 PM, Dmitriy Ryaboy <dv...@gmail.com>
> wrote:
>
> > Vishal,
> > Now I get it. Looks really good actually. It would be great if you
> polished
> > this up and submitted to piggybank.
> >
> > -D
> >
> > On Mon, May 24, 2010 at 1:14 PM, Vishal Santoshi
> > <vi...@gmail.com>wrote:
> >
> > > Using the SequentialFileInputFormat ( or an extension of it )
> > >
> > > @Override
> > >
> > > public InputFormat getInputFormat() throws IOException {
> > >
> > > return new SequenceFileInputFormat<Writable, Writable>();
> > >
> > > }
> > >
> > >
> > > and
> > >
> > >
> > > @Override
> > >
> > > public void setLocation(String location, Job job) throws IOException {
> > >
> > > FileInputFormat.setInputPaths(job, location);
> > >
> > > }
> > >
> > >
> > >
> > >
> > > is all that one would need I think. This is pig0.7.0 though.
> > > I have only tried things like a/*/xyz*   , type of patterns and they
> have
> > > worked for me.
> > >
> > > On Mon, May 24, 2010 at 4:05 PM, Edward Capriolo <
> edlinuxguru@gmail.com
> > > >wrote:
> > >
> > > > Sounds great. I do not need all the things you do but I do need a
> > > sequence
> > > > file loaded that will take globs or directories. The current loader
> > only
> > > > lets you load a since file.
> > > >
> > > > On Mon, May 24, 2010 at 4:03 PM, Vishal Santoshi
> > > > <vi...@gmail.com>wrote:
> > > >
> > > > > All said and done , does this smell a hack.. or is it acceptable ,
> > for
> > > my
> > > > > use case , where I only am interested , in making my Sequential
> Files
> > > and
> > > > > it's contents use Pig , to it's fullest?
> > > > >
> > > > > On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi
> > > > > <vi...@gmail.com>wrote:
> > > > >
> > > > > > Sorry Dmitry.
> > > > > >
> > > > > > Let me explain our issue more lucidly. We have most of our MR
> jobs
> > > use
> > > > > raw
> > > > > > hadoop ( java impl ) and create SequentialFiles with varying
> Custom
> > > > > > Writables.
> > > > > > PigStorage is limited to TextFormat and there is an
> implementation
> > in
> > > > > > piggybank for SequentialFile Loading, which it seems is limited ,
> > in
> > > > the
> > > > > > sense that
> > > > > > it
> > > > > >
> > > > > > * does not provide for Custom Formats ( like a TextPair or a
> Score
> > > that
> > > > > may
> > > > > > use basic Writables like Text,DoubleWritable etc )
> > > > > > * does not provide for type/name mapping ( the "AS" clause )
> > > > > > * does not provide for limiting the  inputs u may be interested
> in.
> > > > > >
> > > > > > I want to use a Loader to provide for something like this
> > > > > >
> > > > > > LOAD 'input' USING SequenceFileLoader AS
> (f1:chararray,2:chararray,
> > > > > > f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double);
> > > > > >
> > > > > > Now this is well and good and easy to write , if we have some
> > > standard
> > > > > > (Text,NullWritable ), Sequential File , with the Text having ","
> > > > > separated
> > > > > > columns ( almost a Pig Storage , but feeding off a Sequential
> File
> > ).
> > > > > >
> > > > > >
> > > > > > In cases though , where we have a  Sequential File (
> > > > > > CustomWritableKey, CustomWritableValue ) where we still would
> like
> > to
> > > > > > extract the raw types and aggregate on, the above fails , as the
> > > > > chararray,
> > > > > > int etc are limited to known types ( and I may be wrong here ).
> > > > > >
> > > > > > What there fore I tried was to reduce the  CustomWritables to
> their
> > > raw
> > > > > > types , using a injectable Converter. This converter, takes the
> > > > > > CustomWritable ( key and value of a SequentialFile )  and returns
> > the
> > > > > >  ArrayList<Object> that are the CustomWritable's reduced to their
> > > base
> > > > > types
> > > > > > and use the List returned to create the Tuple that has to be
> > returned
> > > > > from
> > > > > > getNext().
> > > > > >
> > > > > > I think this code is more likely to tell the tale better.
> > > > > >
> > > > > > http://pastebin.com/QEwMztjU
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <
> > dvryaboy@gmail.com
> > > > > >wrote:
> > > > > >
> > > > > >> Vishal,
> > > > > >> I am not sure what your question is. Could you describe your
> goals
> > > and
> > > > > >> challenges before pasting in the implementation? It looks like
> the
> > > > > bottom
> > > > > >> part of your email, with all the comments, got malformatted,
> which
> > > may
> > > > > be
> > > > > >> the source of my confusion.
> > > > > >>
> > > > > >> Also, various services like pastebin and gist work better for
> code
> > > > > >> sharing,
> > > > > >> as they can take care of highlighting and things of that nature,
> > > which
> > > > > is
> > > > > >> handy for reviews.
> > > > > >>
> > > > > >> Thanks
> > > > > >> -Dmitriy
> > > > > >>
> > > > > >> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> > > > > >> <vi...@gmail.com>wrote:
> > > > > >>
> > > > > >> > I have this working , so  seeking validation and corrections.
> > > > > >> > We have SequentialFiles with various CustomWritables in hadoop
> > and
> > > > we
> > > > > >> want
> > > > > >> > to able to work with them from within pig
> > > > > >> >
> > > > > >> > I have taken PigStorage and the piggybank SequentialFileLoader
> > as
> > > a
> > > > > >> > template
> > > > > >> >  and added pluggable converters that are fed through
> > > > > >> > the SequentialFileLoader ( which has a default ).
> > > > > >> > The below is part of the java file.
> > > > > >> >
> > > > > >> > public class SequenceFileLoader extends FileInputLoadFunc
> > > > > >> > implementsLoadPushDown{
> > > > > >> >
> > > > > >> >        public SequenceFileLoader() {
> > > > > >> >
> > > > > >> > converter = new TextConverter();
> > > > > >> >
> > > > > >> >  }
> > > > > >> >
> > > > > >> >  @SuppressWarnings("unchecked")
> > > > > >> >
> > > > > >> > public SequenceFileLoader(String
> > > customWritableToTupleBaseCoverter)
> > > > > >> > throwsFrontendException{
> > > > > >> >
> > > > > >> >  try {
> > > > > >> >
> > > > > >> > converter =
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> > > > > >> >
> > > > > >> > } catch (Exception e) {
> > > > > >> >
> > > > > >> > throw new FrontendException(e);
> > > > > >> >
> > > > > >> > }
> > > > > >> >
> > > > > >> > }
> > > > > >> >
> > > > > >> >        @SuppressWarnings("unchecked")
> > > > > >> >
> > > > > >> > @Override
> > > > > >> >
> > > > > >> > public Tuple getNext() throws IOException {
> > > > > >> >
> > > > > >> > if (!mRequiredColumnsInitialized) {
> > > > > >> >
> > > > > >> > if (signature!=null) {
> > > > > >> >
> > > > > >> > Properties p =
> > > > > >> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
> > > > > >> >
> > > > > >> > mRequiredColumns =
> > > > > >> (boolean[])ObjectSerializer.deserialize(p.getProperty(
> > > > > >> > signature));
> > > > > >> >
> > > > > >> > }
> > > > > >> >
> > > > > >> > mRequiredColumnsInitialized = true;
> > > > > >> >
> > > > > >> > }
> > > > > >> >
> > > > > >> > boolean next = false;
> > > > > >> >
> > > > > >> > try {
> > > > > >> >
> > > > > >> > next = reader.nextKeyValue();
> > > > > >> >
> > > > > >> > } catch (InterruptedException e) {
> > > > > >> >
> > > > > >> > throw new IOException(e);
> > > > > >> >
> > > > > >> > }
> > > > > >> >
> > > > > >> >
> > > > > >> >  if (!next) return null;
> > > > > >> >
> > > > > >> >
> > > > > >> >  key = reader.getCurrentKey();
> > > > > >> >
> > > > > >> > value = reader.getCurrentValue();
> > > > > >> >
> > > > > >> > converter.populateTupleList(key,
> > > > value,mRequiredColumns,mProtoTuple);
> > > > > >> >
> > > > > >> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> > > > > >> >
> > > > > >> > mProtoTuple.clear();
> > > > > >> >
> > > > > >> > return t;
> > > > > >> >
> > > > > >> > }
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > and
> > > > > >> >
> > > > > >> > public abstract class CustomWritableToTupleBaseConverter<K
> > extends
> > > > > >> > Writable,
> > > > > >> > V extends Writable>{
> > > > > >> >
> > > > > >> >
> > > > > >> >   public abstract void populateTupleList(K time, V value,
> > > boolean[]
> > > > > >> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> > > IOException;
> > > > > >> >
> > > > > >> >
> > > > > >> > }
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > Features * Allows for a Default Format (TextConverter) **
> Text,
> > > > > >> > NullWritable
> > > > > >> > *** Text is treated as a COMMA(",") separated Text Array ****
> > > > Consider
> > > > > a
> > > > > >> > Text with values as 1 , 2 , 3 **** grunt> DEFINE
> > > SequenceFileLoader
> > > > > >> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A =
> > LOAD
> > > > > >> 'input'
> > > > > >> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3
> > > ****
> > > > > >> grunt>
> > > > > >> > 3
> > > > > >> > * Allows for custom formats (example
> > > TimeWritableTestLongConverter)
> > > > **
> > > > > >> It
> > > > > >> > is
> > > > > >> > upto the Custom Converter to provide the SequenceFileLoader
> with
> > > the
> > > > > >> > Writables *** public abstract void populateTupleList(K time, V
> > > > value,
> > > > > >> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple)
> > throws
> > > > > >> > IOException; in the base class
> > CustomWritableToTupleBaseConverter.
> > > > ***
> > > > > >> The
> > > > > >> > Custom Converter has to convert it's Key/Value ( as specified
> by
> > > the
> > > > > >> > SequenceFile ) into a List of Pig recognizable DataTypes ****
> > > grunt>
> > > > > >> DEFINE
> > > > > >> > SequenceFileLoader
> > > a.b.c.SequenceFileLoader('a.b.b.SomeConverter');
> > > > > ****
> > > > > >> > grunt> A = LOAD 'input' USING SequenceFileLoader AS
> > (f1:chararray,
> > > > > >> > f2:chararray, f3:long, f4:chararray, f5:chararray,
> f6:chararray,
> > > > > >> > f7:double);
> > > > > >> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has
> > to
> > > be
> > > > > >> told
> > > > > >> > as
> > > > > >> > to what is the type of the column , for it to do the right
> > > > conversion.
> > > > > >> In
> > > > > >> > the above example is f7 is not defined as double, it will try
> to
> > > > cast
> > > > > it
> > > > > >> > into an int , as we adding a 1 to the value. ** Note that the
> > > custom
> > > > > >> > converter is an argument defined in the DEFINE call. * Allows
> > for
> > > > > >> limiting
> > > > > >> > the number of columns in the input ** grunt> A = LOAD 'input'
> > > USING
> > > > > >> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long,
> > > > > >> f4:chararray,
> > > > > >> > f5:chararray, f6:chararray, f7:double);
> > > > > >> >
> > > > > >> >
> > > > > >> > Any issues any one sees in this approach?
> > > > > >> >
> > > > > >> > I have chosen the path of least resistance .. so any guidance
> > will
> > > > be
> > > > > >> > appreciated.
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Creating a SequentialFileLoader

Posted by Vishal Santoshi <vi...@gmail.com>.
I will spruce it up and there are a few changes to make the abstraction
better as in limiting the columns for performance is done in the concrete
impls .. hardly a good option for reusability.

I am sure though that I do not have submit rights to Pig github.

On Mon, May 24, 2010 at 4:21 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> Vishal,
> Now I get it. Looks really good actually. It would be great if you polished
> this up and submitted to piggybank.
>
> -D
>
> On Mon, May 24, 2010 at 1:14 PM, Vishal Santoshi
> <vi...@gmail.com>wrote:
>
> > Using the SequentialFileInputFormat ( or an extension of it )
> >
> > @Override
> >
> > public InputFormat getInputFormat() throws IOException {
> >
> > return new SequenceFileInputFormat<Writable, Writable>();
> >
> > }
> >
> >
> > and
> >
> >
> > @Override
> >
> > public void setLocation(String location, Job job) throws IOException {
> >
> > FileInputFormat.setInputPaths(job, location);
> >
> > }
> >
> >
> >
> >
> > is all that one would need I think. This is pig0.7.0 though.
> > I have only tried things like a/*/xyz*   , type of patterns and they have
> > worked for me.
> >
> > On Mon, May 24, 2010 at 4:05 PM, Edward Capriolo <edlinuxguru@gmail.com
> > >wrote:
> >
> > > Sounds great. I do not need all the things you do but I do need a
> > sequence
> > > file loaded that will take globs or directories. The current loader
> only
> > > lets you load a since file.
> > >
> > > On Mon, May 24, 2010 at 4:03 PM, Vishal Santoshi
> > > <vi...@gmail.com>wrote:
> > >
> > > > All said and done , does this smell a hack.. or is it acceptable ,
> for
> > my
> > > > use case , where I only am interested , in making my Sequential Files
> > and
> > > > it's contents use Pig , to it's fullest?
> > > >
> > > > On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi
> > > > <vi...@gmail.com>wrote:
> > > >
> > > > > Sorry Dmitry.
> > > > >
> > > > > Let me explain our issue more lucidly. We have most of our MR jobs
> > use
> > > > raw
> > > > > hadoop ( java impl ) and create SequentialFiles with varying Custom
> > > > > Writables.
> > > > > PigStorage is limited to TextFormat and there is an implementation
> in
> > > > > piggybank for SequentialFile Loading, which it seems is limited ,
> in
> > > the
> > > > > sense that
> > > > > it
> > > > >
> > > > > * does not provide for Custom Formats ( like a TextPair or a Score
> > that
> > > > may
> > > > > use basic Writables like Text,DoubleWritable etc )
> > > > > * does not provide for type/name mapping ( the "AS" clause )
> > > > > * does not provide for limiting the  inputs u may be interested in.
> > > > >
> > > > > I want to use a Loader to provide for something like this
> > > > >
> > > > > LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray,
> > > > > f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double);
> > > > >
> > > > > Now this is well and good and easy to write , if we have some
> > standard
> > > > > (Text,NullWritable ), Sequential File , with the Text having ","
> > > > separated
> > > > > columns ( almost a Pig Storage , but feeding off a Sequential File
> ).
> > > > >
> > > > >
> > > > > In cases though , where we have a  Sequential File (
> > > > > CustomWritableKey, CustomWritableValue ) where we still would like
> to
> > > > > extract the raw types and aggregate on, the above fails , as the
> > > > chararray,
> > > > > int etc are limited to known types ( and I may be wrong here ).
> > > > >
> > > > > What there fore I tried was to reduce the  CustomWritables to their
> > raw
> > > > > types , using a injectable Converter. This converter, takes the
> > > > > CustomWritable ( key and value of a SequentialFile )  and returns
> the
> > > > >  ArrayList<Object> that are the CustomWritable's reduced to their
> > base
> > > > types
> > > > > and use the List returned to create the Tuple that has to be
> returned
> > > > from
> > > > > getNext().
> > > > >
> > > > > I think this code is more likely to tell the tale better.
> > > > >
> > > > > http://pastebin.com/QEwMztjU
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <
> dvryaboy@gmail.com
> > > > >wrote:
> > > > >
> > > > >> Vishal,
> > > > >> I am not sure what your question is. Could you describe your goals
> > and
> > > > >> challenges before pasting in the implementation? It looks like the
> > > > bottom
> > > > >> part of your email, with all the comments, got malformatted, which
> > may
> > > > be
> > > > >> the source of my confusion.
> > > > >>
> > > > >> Also, various services like pastebin and gist work better for code
> > > > >> sharing,
> > > > >> as they can take care of highlighting and things of that nature,
> > which
> > > > is
> > > > >> handy for reviews.
> > > > >>
> > > > >> Thanks
> > > > >> -Dmitriy
> > > > >>
> > > > >> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> > > > >> <vi...@gmail.com>wrote:
> > > > >>
> > > > >> > I have this working , so  seeking validation and corrections.
> > > > >> > We have SequentialFiles with various CustomWritables in hadoop
> and
> > > we
> > > > >> want
> > > > >> > to able to work with them from within pig
> > > > >> >
> > > > >> > I have taken PigStorage and the piggybank SequentialFileLoader
> as
> > a
> > > > >> > template
> > > > >> >  and added pluggable converters that are fed through
> > > > >> > the SequentialFileLoader ( which has a default ).
> > > > >> > The below is part of the java file.
> > > > >> >
> > > > >> > public class SequenceFileLoader extends FileInputLoadFunc
> > > > >> > implementsLoadPushDown{
> > > > >> >
> > > > >> >        public SequenceFileLoader() {
> > > > >> >
> > > > >> > converter = new TextConverter();
> > > > >> >
> > > > >> >  }
> > > > >> >
> > > > >> >  @SuppressWarnings("unchecked")
> > > > >> >
> > > > >> > public SequenceFileLoader(String
> > customWritableToTupleBaseCoverter)
> > > > >> > throwsFrontendException{
> > > > >> >
> > > > >> >  try {
> > > > >> >
> > > > >> > converter =
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> > > > >> >
> > > > >> > } catch (Exception e) {
> > > > >> >
> > > > >> > throw new FrontendException(e);
> > > > >> >
> > > > >> > }
> > > > >> >
> > > > >> > }
> > > > >> >
> > > > >> >        @SuppressWarnings("unchecked")
> > > > >> >
> > > > >> > @Override
> > > > >> >
> > > > >> > public Tuple getNext() throws IOException {
> > > > >> >
> > > > >> > if (!mRequiredColumnsInitialized) {
> > > > >> >
> > > > >> > if (signature!=null) {
> > > > >> >
> > > > >> > Properties p =
> > > > >> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
> > > > >> >
> > > > >> > mRequiredColumns =
> > > > >> (boolean[])ObjectSerializer.deserialize(p.getProperty(
> > > > >> > signature));
> > > > >> >
> > > > >> > }
> > > > >> >
> > > > >> > mRequiredColumnsInitialized = true;
> > > > >> >
> > > > >> > }
> > > > >> >
> > > > >> > boolean next = false;
> > > > >> >
> > > > >> > try {
> > > > >> >
> > > > >> > next = reader.nextKeyValue();
> > > > >> >
> > > > >> > } catch (InterruptedException e) {
> > > > >> >
> > > > >> > throw new IOException(e);
> > > > >> >
> > > > >> > }
> > > > >> >
> > > > >> >
> > > > >> >  if (!next) return null;
> > > > >> >
> > > > >> >
> > > > >> >  key = reader.getCurrentKey();
> > > > >> >
> > > > >> > value = reader.getCurrentValue();
> > > > >> >
> > > > >> > converter.populateTupleList(key,
> > > value,mRequiredColumns,mProtoTuple);
> > > > >> >
> > > > >> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> > > > >> >
> > > > >> > mProtoTuple.clear();
> > > > >> >
> > > > >> > return t;
> > > > >> >
> > > > >> > }
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > and
> > > > >> >
> > > > >> > public abstract class CustomWritableToTupleBaseConverter<K
> extends
> > > > >> > Writable,
> > > > >> > V extends Writable>{
> > > > >> >
> > > > >> >
> > > > >> >   public abstract void populateTupleList(K time, V value,
> > boolean[]
> > > > >> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> > IOException;
> > > > >> >
> > > > >> >
> > > > >> > }
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > Features * Allows for a Default Format (TextConverter) ** Text,
> > > > >> > NullWritable
> > > > >> > *** Text is treated as a COMMA(",") separated Text Array ****
> > > Consider
> > > > a
> > > > >> > Text with values as 1 , 2 , 3 **** grunt> DEFINE
> > SequenceFileLoader
> > > > >> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A =
> LOAD
> > > > >> 'input'
> > > > >> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3
> > ****
> > > > >> grunt>
> > > > >> > 3
> > > > >> > * Allows for custom formats (example
> > TimeWritableTestLongConverter)
> > > **
> > > > >> It
> > > > >> > is
> > > > >> > upto the Custom Converter to provide the SequenceFileLoader with
> > the
> > > > >> > Writables *** public abstract void populateTupleList(K time, V
> > > value,
> > > > >> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple)
> throws
> > > > >> > IOException; in the base class
> CustomWritableToTupleBaseConverter.
> > > ***
> > > > >> The
> > > > >> > Custom Converter has to convert it's Key/Value ( as specified by
> > the
> > > > >> > SequenceFile ) into a List of Pig recognizable DataTypes ****
> > grunt>
> > > > >> DEFINE
> > > > >> > SequenceFileLoader
> > a.b.c.SequenceFileLoader('a.b.b.SomeConverter');
> > > > ****
> > > > >> > grunt> A = LOAD 'input' USING SequenceFileLoader AS
> (f1:chararray,
> > > > >> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
> > > > >> > f7:double);
> > > > >> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has
> to
> > be
> > > > >> told
> > > > >> > as
> > > > >> > to what is the type of the column , for it to do the right
> > > conversion.
> > > > >> In
> > > > >> > the above example is f7 is not defined as double, it will try to
> > > cast
> > > > it
> > > > >> > into an int , as we adding a 1 to the value. ** Note that the
> > custom
> > > > >> > converter is an argument defined in the DEFINE call. * Allows
> for
> > > > >> limiting
> > > > >> > the number of columns in the input ** grunt> A = LOAD 'input'
> > USING
> > > > >> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long,
> > > > >> f4:chararray,
> > > > >> > f5:chararray, f6:chararray, f7:double);
> > > > >> >
> > > > >> >
> > > > >> > Any issues any one sees in this approach?
> > > > >> >
> > > > >> > I have chosen the path of least resistance .. so any guidance
> will
> > > be
> > > > >> > appreciated.
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Creating a SequentialFileLoader

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Vishal,
Now I get it. Looks really good actually. It would be great if you polished
this up and submitted to piggybank.

-D

On Mon, May 24, 2010 at 1:14 PM, Vishal Santoshi
<vi...@gmail.com>wrote:

> Using the SequentialFileInputFormat ( or an extension of it )
>
> @Override
>
> public InputFormat getInputFormat() throws IOException {
>
> return new SequenceFileInputFormat<Writable, Writable>();
>
> }
>
>
> and
>
>
> @Override
>
> public void setLocation(String location, Job job) throws IOException {
>
> FileInputFormat.setInputPaths(job, location);
>
> }
>
>
>
>
> is all that one would need I think. This is pig0.7.0 though.
> I have only tried things like a/*/xyz*   , type of patterns and they have
> worked for me.
>
> On Mon, May 24, 2010 at 4:05 PM, Edward Capriolo <edlinuxguru@gmail.com
> >wrote:
>
> > Sounds great. I do not need all the things you do but I do need a
> sequence
> > file loaded that will take globs or directories. The current loader only
> > lets you load a since file.
> >
> > On Mon, May 24, 2010 at 4:03 PM, Vishal Santoshi
> > <vi...@gmail.com>wrote:
> >
> > > All said and done , does this smell a hack.. or is it acceptable , for
> my
> > > use case , where I only am interested , in making my Sequential Files
> and
> > > it's contents use Pig , to it's fullest?
> > >
> > > On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi
> > > <vi...@gmail.com>wrote:
> > >
> > > > Sorry Dmitry.
> > > >
> > > > Let me explain our issue more lucidly. We have most of our MR jobs
> use
> > > raw
> > > > hadoop ( java impl ) and create SequentialFiles with varying Custom
> > > > Writables.
> > > > PigStorage is limited to TextFormat and there is an implementation in
> > > > piggybank for SequentialFile Loading, which it seems is limited , in
> > the
> > > > sense that
> > > > it
> > > >
> > > > * does not provide for Custom Formats ( like a TextPair or a Score
> that
> > > may
> > > > use basic Writables like Text,DoubleWritable etc )
> > > > * does not provide for type/name mapping ( the "AS" clause )
> > > > * does not provide for limiting the  inputs u may be interested in.
> > > >
> > > > I want to use a Loader to provide for something like this
> > > >
> > > > LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray,
> > > > f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double);
> > > >
> > > > Now this is well and good and easy to write , if we have some
> standard
> > > > (Text,NullWritable ), Sequential File , with the Text having ","
> > > separated
> > > > columns ( almost a Pig Storage , but feeding off a Sequential File ).
> > > >
> > > >
> > > > In cases though , where we have a  Sequential File (
> > > > CustomWritableKey, CustomWritableValue ) where we still would like to
> > > > extract the raw types and aggregate on, the above fails , as the
> > > chararray,
> > > > int etc are limited to known types ( and I may be wrong here ).
> > > >
> > > > What there fore I tried was to reduce the  CustomWritables to their
> raw
> > > > types , using a injectable Converter. This converter, takes the
> > > > CustomWritable ( key and value of a SequentialFile )  and returns the
> > > >  ArrayList<Object> that are the CustomWritable's reduced to their
> base
> > > types
> > > > and use the List returned to create the Tuple that has to be returned
> > > from
> > > > getNext().
> > > >
> > > > I think this code is more likely to tell the tale better.
> > > >
> > > > http://pastebin.com/QEwMztjU
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <dvryaboy@gmail.com
> > > >wrote:
> > > >
> > > >> Vishal,
> > > >> I am not sure what your question is. Could you describe your goals
> and
> > > >> challenges before pasting in the implementation? It looks like the
> > > bottom
> > > >> part of your email, with all the comments, got malformatted, which
> may
> > > be
> > > >> the source of my confusion.
> > > >>
> > > >> Also, various services like pastebin and gist work better for code
> > > >> sharing,
> > > >> as they can take care of highlighting and things of that nature,
> which
> > > is
> > > >> handy for reviews.
> > > >>
> > > >> Thanks
> > > >> -Dmitriy
> > > >>
> > > >> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> > > >> <vi...@gmail.com>wrote:
> > > >>
> > > >> > I have this working , so  seeking validation and corrections.
> > > >> > We have SequentialFiles with various CustomWritables in hadoop and
> > we
> > > >> want
> > > >> > to able to work with them from within pig
> > > >> >
> > > >> > I have taken PigStorage and the piggybank SequentialFileLoader as
> a
> > > >> > template
> > > >> >  and added pluggable converters that are fed through
> > > >> > the SequentialFileLoader ( which has a default ).
> > > >> > The below is part of the java file.
> > > >> >
> > > >> > public class SequenceFileLoader extends FileInputLoadFunc
> > > >> > implementsLoadPushDown{
> > > >> >
> > > >> >        public SequenceFileLoader() {
> > > >> >
> > > >> > converter = new TextConverter();
> > > >> >
> > > >> >  }
> > > >> >
> > > >> >  @SuppressWarnings("unchecked")
> > > >> >
> > > >> > public SequenceFileLoader(String
> customWritableToTupleBaseCoverter)
> > > >> > throwsFrontendException{
> > > >> >
> > > >> >  try {
> > > >> >
> > > >> > converter =
> > > >> >
> > > >> >
> > > >>
> > >
> >
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> > > >> >
> > > >> > } catch (Exception e) {
> > > >> >
> > > >> > throw new FrontendException(e);
> > > >> >
> > > >> > }
> > > >> >
> > > >> > }
> > > >> >
> > > >> >        @SuppressWarnings("unchecked")
> > > >> >
> > > >> > @Override
> > > >> >
> > > >> > public Tuple getNext() throws IOException {
> > > >> >
> > > >> > if (!mRequiredColumnsInitialized) {
> > > >> >
> > > >> > if (signature!=null) {
> > > >> >
> > > >> > Properties p =
> > > >> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
> > > >> >
> > > >> > mRequiredColumns =
> > > >> (boolean[])ObjectSerializer.deserialize(p.getProperty(
> > > >> > signature));
> > > >> >
> > > >> > }
> > > >> >
> > > >> > mRequiredColumnsInitialized = true;
> > > >> >
> > > >> > }
> > > >> >
> > > >> > boolean next = false;
> > > >> >
> > > >> > try {
> > > >> >
> > > >> > next = reader.nextKeyValue();
> > > >> >
> > > >> > } catch (InterruptedException e) {
> > > >> >
> > > >> > throw new IOException(e);
> > > >> >
> > > >> > }
> > > >> >
> > > >> >
> > > >> >  if (!next) return null;
> > > >> >
> > > >> >
> > > >> >  key = reader.getCurrentKey();
> > > >> >
> > > >> > value = reader.getCurrentValue();
> > > >> >
> > > >> > converter.populateTupleList(key,
> > value,mRequiredColumns,mProtoTuple);
> > > >> >
> > > >> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> > > >> >
> > > >> > mProtoTuple.clear();
> > > >> >
> > > >> > return t;
> > > >> >
> > > >> > }
> > > >> >
> > > >> >
> > > >> >
> > > >> > and
> > > >> >
> > > >> > public abstract class CustomWritableToTupleBaseConverter<K extends
> > > >> > Writable,
> > > >> > V extends Writable>{
> > > >> >
> > > >> >
> > > >> >   public abstract void populateTupleList(K time, V value,
> boolean[]
> > > >> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> IOException;
> > > >> >
> > > >> >
> > > >> > }
> > > >> >
> > > >> >
> > > >> >
> > > >> > Features * Allows for a Default Format (TextConverter) ** Text,
> > > >> > NullWritable
> > > >> > *** Text is treated as a COMMA(",") separated Text Array ****
> > Consider
> > > a
> > > >> > Text with values as 1 , 2 , 3 **** grunt> DEFINE
> SequenceFileLoader
> > > >> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD
> > > >> 'input'
> > > >> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3
> ****
> > > >> grunt>
> > > >> > 3
> > > >> > * Allows for custom formats (example
> TimeWritableTestLongConverter)
> > **
> > > >> It
> > > >> > is
> > > >> > upto the Custom Converter to provide the SequenceFileLoader with
> the
> > > >> > Writables *** public abstract void populateTupleList(K time, V
> > value,
> > > >> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> > > >> > IOException; in the base class CustomWritableToTupleBaseConverter.
> > ***
> > > >> The
> > > >> > Custom Converter has to convert it's Key/Value ( as specified by
> the
> > > >> > SequenceFile ) into a List of Pig recognizable DataTypes ****
> grunt>
> > > >> DEFINE
> > > >> > SequenceFileLoader
> a.b.c.SequenceFileLoader('a.b.b.SomeConverter');
> > > ****
> > > >> > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
> > > >> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
> > > >> > f7:double);
> > > >> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to
> be
> > > >> told
> > > >> > as
> > > >> > to what is the type of the column , for it to do the right
> > conversion.
> > > >> In
> > > >> > the above example is f7 is not defined as double, it will try to
> > cast
> > > it
> > > >> > into an int , as we adding a 1 to the value. ** Note that the
> custom
> > > >> > converter is an argument defined in the DEFINE call. * Allows for
> > > >> limiting
> > > >> > the number of columns in the input ** grunt> A = LOAD 'input'
> USING
> > > >> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long,
> > > >> f4:chararray,
> > > >> > f5:chararray, f6:chararray, f7:double);
> > > >> >
> > > >> >
> > > >> > Any issues any one sees in this approach?
> > > >> >
> > > >> > I have chosen the path of least resistance .. so any guidance will
> > be
> > > >> > appreciated.
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: Creating a SequentialFileLoader

Posted by Vishal Santoshi <vi...@gmail.com>.
Using the SequentialFileInputFormat ( or an extension of it )

@Override

public InputFormat getInputFormat() throws IOException {

return new SequenceFileInputFormat<Writable, Writable>();

}


and


@Override

public void setLocation(String location, Job job) throws IOException {

FileInputFormat.setInputPaths(job, location);

}




is all that one would need I think. This is pig0.7.0 though.
I have only tried things like a/*/xyz*   , type of patterns and they have
worked for me.

On Mon, May 24, 2010 at 4:05 PM, Edward Capriolo <ed...@gmail.com>wrote:

> Sounds great. I do not need all the things you do but I do need a sequence
> file loaded that will take globs or directories. The current loader only
> lets you load a since file.
>
> On Mon, May 24, 2010 at 4:03 PM, Vishal Santoshi
> <vi...@gmail.com>wrote:
>
> > All said and done , does this smell a hack.. or is it acceptable , for my
> > use case , where I only am interested , in making my Sequential Files and
> > it's contents use Pig , to it's fullest?
> >
> > On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi
> > <vi...@gmail.com>wrote:
> >
> > > Sorry Dmitry.
> > >
> > > Let me explain our issue more lucidly. We have most of our MR jobs use
> > raw
> > > hadoop ( java impl ) and create SequentialFiles with varying Custom
> > > Writables.
> > > PigStorage is limited to TextFormat and there is an implementation in
> > > piggybank for SequentialFile Loading, which it seems is limited , in
> the
> > > sense that
> > > it
> > >
> > > * does not provide for Custom Formats ( like a TextPair or a Score that
> > may
> > > use basic Writables like Text,DoubleWritable etc )
> > > * does not provide for type/name mapping ( the "AS" clause )
> > > * does not provide for limiting the  inputs u may be interested in.
> > >
> > > I want to use a Loader to provide for something like this
> > >
> > > LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray,
> > > f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double);
> > >
> > > Now this is well and good and easy to write , if we have some standard
> > > (Text,NullWritable ), Sequential File , with the Text having ","
> > separated
> > > columns ( almost a Pig Storage , but feeding off a Sequential File ).
> > >
> > >
> > > In cases though , where we have a  Sequential File (
> > > CustomWritableKey, CustomWritableValue ) where we still would like to
> > > extract the raw types and aggregate on, the above fails , as the
> > chararray,
> > > int etc are limited to known types ( and I may be wrong here ).
> > >
> > > What there fore I tried was to reduce the  CustomWritables to their raw
> > > types , using a injectable Converter. This converter, takes the
> > > CustomWritable ( key and value of a SequentialFile )  and returns the
> > >  ArrayList<Object> that are the CustomWritable's reduced to their base
> > types
> > > and use the List returned to create the Tuple that has to be returned
> > from
> > > getNext().
> > >
> > > I think this code is more likely to tell the tale better.
> > >
> > > http://pastebin.com/QEwMztjU
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <dvryaboy@gmail.com
> > >wrote:
> > >
> > >> Vishal,
> > >> I am not sure what your question is. Could you describe your goals and
> > >> challenges before pasting in the implementation? It looks like the
> > bottom
> > >> part of your email, with all the comments, got malformatted, which may
> > be
> > >> the source of my confusion.
> > >>
> > >> Also, various services like pastebin and gist work better for code
> > >> sharing,
> > >> as they can take care of highlighting and things of that nature, which
> > is
> > >> handy for reviews.
> > >>
> > >> Thanks
> > >> -Dmitriy
> > >>
> > >> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> > >> <vi...@gmail.com>wrote:
> > >>
> > >> > I have this working , so  seeking validation and corrections.
> > >> > We have SequentialFiles with various CustomWritables in hadoop and
> we
> > >> want
> > >> > to able to work with them from within pig
> > >> >
> > >> > I have taken PigStorage and the piggybank SequentialFileLoader as a
> > >> > template
> > >> >  and added pluggable converters that are fed through
> > >> > the SequentialFileLoader ( which has a default ).
> > >> > The below is part of the java file.
> > >> >
> > >> > public class SequenceFileLoader extends FileInputLoadFunc
> > >> > implementsLoadPushDown{
> > >> >
> > >> >        public SequenceFileLoader() {
> > >> >
> > >> > converter = new TextConverter();
> > >> >
> > >> >  }
> > >> >
> > >> >  @SuppressWarnings("unchecked")
> > >> >
> > >> > public SequenceFileLoader(String customWritableToTupleBaseCoverter)
> > >> > throwsFrontendException{
> > >> >
> > >> >  try {
> > >> >
> > >> > converter =
> > >> >
> > >> >
> > >>
> >
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> > >> >
> > >> > } catch (Exception e) {
> > >> >
> > >> > throw new FrontendException(e);
> > >> >
> > >> > }
> > >> >
> > >> > }
> > >> >
> > >> >        @SuppressWarnings("unchecked")
> > >> >
> > >> > @Override
> > >> >
> > >> > public Tuple getNext() throws IOException {
> > >> >
> > >> > if (!mRequiredColumnsInitialized) {
> > >> >
> > >> > if (signature!=null) {
> > >> >
> > >> > Properties p =
> > >> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
> > >> >
> > >> > mRequiredColumns =
> > >> (boolean[])ObjectSerializer.deserialize(p.getProperty(
> > >> > signature));
> > >> >
> > >> > }
> > >> >
> > >> > mRequiredColumnsInitialized = true;
> > >> >
> > >> > }
> > >> >
> > >> > boolean next = false;
> > >> >
> > >> > try {
> > >> >
> > >> > next = reader.nextKeyValue();
> > >> >
> > >> > } catch (InterruptedException e) {
> > >> >
> > >> > throw new IOException(e);
> > >> >
> > >> > }
> > >> >
> > >> >
> > >> >  if (!next) return null;
> > >> >
> > >> >
> > >> >  key = reader.getCurrentKey();
> > >> >
> > >> > value = reader.getCurrentValue();
> > >> >
> > >> > converter.populateTupleList(key,
> value,mRequiredColumns,mProtoTuple);
> > >> >
> > >> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> > >> >
> > >> > mProtoTuple.clear();
> > >> >
> > >> > return t;
> > >> >
> > >> > }
> > >> >
> > >> >
> > >> >
> > >> > and
> > >> >
> > >> > public abstract class CustomWritableToTupleBaseConverter<K extends
> > >> > Writable,
> > >> > V extends Writable>{
> > >> >
> > >> >
> > >> >   public abstract void populateTupleList(K time, V value, boolean[]
> > >> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;
> > >> >
> > >> >
> > >> > }
> > >> >
> > >> >
> > >> >
> > >> > Features * Allows for a Default Format (TextConverter) ** Text,
> > >> > NullWritable
> > >> > *** Text is treated as a COMMA(",") separated Text Array ****
> Consider
> > a
> > >> > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
> > >> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD
> > >> 'input'
> > >> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 ****
> > >> grunt>
> > >> > 3
> > >> > * Allows for custom formats (example TimeWritableTestLongConverter)
> **
> > >> It
> > >> > is
> > >> > upto the Custom Converter to provide the SequenceFileLoader with the
> > >> > Writables *** public abstract void populateTupleList(K time, V
> value,
> > >> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> > >> > IOException; in the base class CustomWritableToTupleBaseConverter.
> ***
> > >> The
> > >> > Custom Converter has to convert it's Key/Value ( as specified by the
> > >> > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt>
> > >> DEFINE
> > >> > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter');
> > ****
> > >> > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
> > >> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
> > >> > f7:double);
> > >> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be
> > >> told
> > >> > as
> > >> > to what is the type of the column , for it to do the right
> conversion.
> > >> In
> > >> > the above example is f7 is not defined as double, it will try to
> cast
> > it
> > >> > into an int , as we adding a 1 to the value. ** Note that the custom
> > >> > converter is an argument defined in the DEFINE call. * Allows for
> > >> limiting
> > >> > the number of columns in the input ** grunt> A = LOAD 'input' USING
> > >> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long,
> > >> f4:chararray,
> > >> > f5:chararray, f6:chararray, f7:double);
> > >> >
> > >> >
> > >> > Any issues any one sees in this approach?
> > >> >
> > >> > I have chosen the path of least resistance .. so any guidance will
> be
> > >> > appreciated.
> > >> >
> > >>
> > >
> > >
> >
>

Re: Creating a SequentialFileLoader

Posted by Edward Capriolo <ed...@gmail.com>.
Sounds great. I do not need all the things you do but I do need a sequence
file loaded that will take globs or directories. The current loader only
lets you load a since file.

On Mon, May 24, 2010 at 4:03 PM, Vishal Santoshi
<vi...@gmail.com>wrote:

> All said and done , does this smell a hack.. or is it acceptable , for my
> use case , where I only am interested , in making my Sequential Files and
> it's contents use Pig , to it's fullest?
>
> On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi
> <vi...@gmail.com>wrote:
>
> > Sorry Dmitry.
> >
> > Let me explain our issue more lucidly. We have most of our MR jobs use
> raw
> > hadoop ( java impl ) and create SequentialFiles with varying Custom
> > Writables.
> > PigStorage is limited to TextFormat and there is an implementation in
> > piggybank for SequentialFile Loading, which it seems is limited , in the
> > sense that
> > it
> >
> > * does not provide for Custom Formats ( like a TextPair or a Score that
> may
> > use basic Writables like Text,DoubleWritable etc )
> > * does not provide for type/name mapping ( the "AS" clause )
> > * does not provide for limiting the  inputs u may be interested in.
> >
> > I want to use a Loader to provide for something like this
> >
> > LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray,
> > f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double);
> >
> > Now this is well and good and easy to write , if we have some standard
> > (Text,NullWritable ), Sequential File , with the Text having ","
> separated
> > columns ( almost a Pig Storage , but feeding off a Sequential File ).
> >
> >
> > In cases though , where we have a  Sequential File (
> > CustomWritableKey, CustomWritableValue ) where we still would like to
> > extract the raw types and aggregate on, the above fails , as the
> chararray,
> > int etc are limited to known types ( and I may be wrong here ).
> >
> > What there fore I tried was to reduce the  CustomWritables to their raw
> > types , using a injectable Converter. This converter, takes the
> > CustomWritable ( key and value of a SequentialFile )  and returns the
> >  ArrayList<Object> that are the CustomWritable's reduced to their base
> types
> > and use the List returned to create the Tuple that has to be returned
> from
> > getNext().
> >
> > I think this code is more likely to tell the tale better.
> >
> > http://pastebin.com/QEwMztjU
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <dvryaboy@gmail.com
> >wrote:
> >
> >> Vishal,
> >> I am not sure what your question is. Could you describe your goals and
> >> challenges before pasting in the implementation? It looks like the
> bottom
> >> part of your email, with all the comments, got malformatted, which may
> be
> >> the source of my confusion.
> >>
> >> Also, various services like pastebin and gist work better for code
> >> sharing,
> >> as they can take care of highlighting and things of that nature, which
> is
> >> handy for reviews.
> >>
> >> Thanks
> >> -Dmitriy
> >>
> >> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> >> <vi...@gmail.com>wrote:
> >>
> >> > I have this working , so  seeking validation and corrections.
> >> > We have SequentialFiles with various CustomWritables in hadoop and we
> >> want
> >> > to able to work with them from within pig
> >> >
> >> > I have taken PigStorage and the piggybank SequentialFileLoader as a
> >> > template
> >> >  and added pluggable converters that are fed through
> >> > the SequentialFileLoader ( which has a default ).
> >> > The below is part of the java file.
> >> >
> >> > public class SequenceFileLoader extends FileInputLoadFunc
> >> > implementsLoadPushDown{
> >> >
> >> >        public SequenceFileLoader() {
> >> >
> >> > converter = new TextConverter();
> >> >
> >> >  }
> >> >
> >> >  @SuppressWarnings("unchecked")
> >> >
> >> > public SequenceFileLoader(String customWritableToTupleBaseCoverter)
> >> > throwsFrontendException{
> >> >
> >> >  try {
> >> >
> >> > converter =
> >> >
> >> >
> >>
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> >> >
> >> > } catch (Exception e) {
> >> >
> >> > throw new FrontendException(e);
> >> >
> >> > }
> >> >
> >> > }
> >> >
> >> >        @SuppressWarnings("unchecked")
> >> >
> >> > @Override
> >> >
> >> > public Tuple getNext() throws IOException {
> >> >
> >> > if (!mRequiredColumnsInitialized) {
> >> >
> >> > if (signature!=null) {
> >> >
> >> > Properties p =
> >> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
> >> >
> >> > mRequiredColumns =
> >> (boolean[])ObjectSerializer.deserialize(p.getProperty(
> >> > signature));
> >> >
> >> > }
> >> >
> >> > mRequiredColumnsInitialized = true;
> >> >
> >> > }
> >> >
> >> > boolean next = false;
> >> >
> >> > try {
> >> >
> >> > next = reader.nextKeyValue();
> >> >
> >> > } catch (InterruptedException e) {
> >> >
> >> > throw new IOException(e);
> >> >
> >> > }
> >> >
> >> >
> >> >  if (!next) return null;
> >> >
> >> >
> >> >  key = reader.getCurrentKey();
> >> >
> >> > value = reader.getCurrentValue();
> >> >
> >> > converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);
> >> >
> >> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> >> >
> >> > mProtoTuple.clear();
> >> >
> >> > return t;
> >> >
> >> > }
> >> >
> >> >
> >> >
> >> > and
> >> >
> >> > public abstract class CustomWritableToTupleBaseConverter<K extends
> >> > Writable,
> >> > V extends Writable>{
> >> >
> >> >
> >> >   public abstract void populateTupleList(K time, V value, boolean[]
> >> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;
> >> >
> >> >
> >> > }
> >> >
> >> >
> >> >
> >> > Features * Allows for a Default Format (TextConverter) ** Text,
> >> > NullWritable
> >> > *** Text is treated as a COMMA(",") separated Text Array **** Consider
> a
> >> > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
> >> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD
> >> 'input'
> >> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 ****
> >> grunt>
> >> > 3
> >> > * Allows for custom formats (example TimeWritableTestLongConverter) **
> >> It
> >> > is
> >> > upto the Custom Converter to provide the SequenceFileLoader with the
> >> > Writables *** public abstract void populateTupleList(K time, V value,
> >> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> >> > IOException; in the base class CustomWritableToTupleBaseConverter. ***
> >> The
> >> > Custom Converter has to convert it's Key/Value ( as specified by the
> >> > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt>
> >> DEFINE
> >> > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter');
> ****
> >> > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
> >> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
> >> > f7:double);
> >> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be
> >> told
> >> > as
> >> > to what is the type of the column , for it to do the right conversion.
> >> In
> >> > the above example is f7 is not defined as double, it will try to cast
> it
> >> > into an int , as we adding a 1 to the value. ** Note that the custom
> >> > converter is an argument defined in the DEFINE call. * Allows for
> >> limiting
> >> > the number of columns in the input ** grunt> A = LOAD 'input' USING
> >> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long,
> >> f4:chararray,
> >> > f5:chararray, f6:chararray, f7:double);
> >> >
> >> >
> >> > Any issues any one sees in this approach?
> >> >
> >> > I have chosen the path of least resistance .. so any guidance will be
> >> > appreciated.
> >> >
> >>
> >
> >
>

Re: Creating a SequentialFileLoader

Posted by Vishal Santoshi <vi...@gmail.com>.
All said and done , does this smell a hack.. or is it acceptable , for my
use case , where I only am interested , in making my Sequential Files and
it's contents use Pig , to it's fullest?

On Mon, May 24, 2010 at 3:59 PM, Vishal Santoshi
<vi...@gmail.com>wrote:

> Sorry Dmitry.
>
> Let me explain our issue more lucidly. We have most of our MR jobs use raw
> hadoop ( java impl ) and create SequentialFiles with varying Custom
> Writables.
> PigStorage is limited to TextFormat and there is an implementation in
> piggybank for SequentialFile Loading, which it seems is limited , in the
> sense that
> it
>
> * does not provide for Custom Formats ( like a TextPair or a Score that may
> use basic Writables like Text,DoubleWritable etc )
> * does not provide for type/name mapping ( the "AS" clause )
> * does not provide for limiting the  inputs u may be interested in.
>
> I want to use a Loader to provide for something like this
>
> LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray,
> f3:long, f4:chararray, f5:chararray, f6:chararray,f7:double);
>
> Now this is well and good and easy to write , if we have some standard
> (Text,NullWritable ), Sequential File , with the Text having "," separated
> columns ( almost a Pig Storage , but feeding off a Sequential File ).
>
>
> In cases though , where we have a  Sequential File (
> CustomWritableKey, CustomWritableValue ) where we still would like to
> extract the raw types and aggregate on, the above fails , as the chararray,
> int etc are limited to known types ( and I may be wrong here ).
>
> What there fore I tried was to reduce the  CustomWritables to their raw
> types , using a injectable Converter. This converter, takes the
> CustomWritable ( key and value of a SequentialFile )  and returns the
>  ArrayList<Object> that are the CustomWritable's reduced to their base types
> and use the List returned to create the Tuple that has to be returned from
> getNext().
>
> I think this code is more likely to tell the tale better.
>
> http://pastebin.com/QEwMztjU
>
>
>
>
>
>
>
>
>
>
>
> On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <dv...@gmail.com>wrote:
>
>> Vishal,
>> I am not sure what your question is. Could you describe your goals and
>> challenges before pasting in the implementation? It looks like the bottom
>> part of your email, with all the comments, got malformatted, which may be
>> the source of my confusion.
>>
>> Also, various services like pastebin and gist work better for code
>> sharing,
>> as they can take care of highlighting and things of that nature, which is
>> handy for reviews.
>>
>> Thanks
>> -Dmitriy
>>
>> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
>> <vi...@gmail.com>wrote:
>>
>> > I have this working , so  seeking validation and corrections.
>> > We have SequentialFiles with various CustomWritables in hadoop and we
>> want
>> > to able to work with them from within pig
>> >
>> > I have taken PigStorage and the piggybank SequentialFileLoader as a
>> > template
>> >  and added pluggable converters that are fed through
>> > the SequentialFileLoader ( which has a default ).
>> > The below is part of the java file.
>> >
>> > public class SequenceFileLoader extends FileInputLoadFunc
>> > implementsLoadPushDown{
>> >
>> >        public SequenceFileLoader() {
>> >
>> > converter = new TextConverter();
>> >
>> >  }
>> >
>> >  @SuppressWarnings("unchecked")
>> >
>> > public SequenceFileLoader(String customWritableToTupleBaseCoverter)
>> > throwsFrontendException{
>> >
>> >  try {
>> >
>> > converter =
>> >
>> >
>> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
>> >
>> > } catch (Exception e) {
>> >
>> > throw new FrontendException(e);
>> >
>> > }
>> >
>> > }
>> >
>> >        @SuppressWarnings("unchecked")
>> >
>> > @Override
>> >
>> > public Tuple getNext() throws IOException {
>> >
>> > if (!mRequiredColumnsInitialized) {
>> >
>> > if (signature!=null) {
>> >
>> > Properties p =
>> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
>> >
>> > mRequiredColumns =
>> (boolean[])ObjectSerializer.deserialize(p.getProperty(
>> > signature));
>> >
>> > }
>> >
>> > mRequiredColumnsInitialized = true;
>> >
>> > }
>> >
>> > boolean next = false;
>> >
>> > try {
>> >
>> > next = reader.nextKeyValue();
>> >
>> > } catch (InterruptedException e) {
>> >
>> > throw new IOException(e);
>> >
>> > }
>> >
>> >
>> >  if (!next) return null;
>> >
>> >
>> >  key = reader.getCurrentKey();
>> >
>> > value = reader.getCurrentValue();
>> >
>> > converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);
>> >
>> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
>> >
>> > mProtoTuple.clear();
>> >
>> > return t;
>> >
>> > }
>> >
>> >
>> >
>> > and
>> >
>> > public abstract class CustomWritableToTupleBaseConverter<K extends
>> > Writable,
>> > V extends Writable>{
>> >
>> >
>> >   public abstract void populateTupleList(K time, V value, boolean[]
>> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;
>> >
>> >
>> > }
>> >
>> >
>> >
>> > Features * Allows for a Default Format (TextConverter) ** Text,
>> > NullWritable
>> > *** Text is treated as a COMMA(",") separated Text Array **** Consider a
>> > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
>> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD
>> 'input'
>> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 ****
>> grunt>
>> > 3
>> > * Allows for custom formats (example TimeWritableTestLongConverter) **
>> It
>> > is
>> > upto the Custom Converter to provide the SequenceFileLoader with the
>> > Writables *** public abstract void populateTupleList(K time, V value,
>> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
>> > IOException; in the base class CustomWritableToTupleBaseConverter. ***
>> The
>> > Custom Converter has to convert it's Key/Value ( as specified by the
>> > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt>
>> DEFINE
>> > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); ****
>> > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
>> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
>> > f7:double);
>> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be
>> told
>> > as
>> > to what is the type of the column , for it to do the right conversion.
>> In
>> > the above example is f7 is not defined as double, it will try to cast it
>> > into an int , as we adding a 1 to the value. ** Note that the custom
>> > converter is an argument defined in the DEFINE call. * Allows for
>> limiting
>> > the number of columns in the input ** grunt> A = LOAD 'input' USING
>> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long,
>> f4:chararray,
>> > f5:chararray, f6:chararray, f7:double);
>> >
>> >
>> > Any issues any one sees in this approach?
>> >
>> > I have chosen the path of least resistance .. so any guidance will be
>> > appreciated.
>> >
>>
>
>

Re: Creating a SequentialFileLoader

Posted by Vishal Santoshi <vi...@gmail.com>.
Sorry Dmitry.

Let me explain our issue more lucidly. We have most of our MR jobs use raw
hadoop ( java impl ) and create SequentialFiles with varying Custom
Writables.
PigStorage is limited to TextFormat and there is an implementation in
piggybank for SequentialFile Loading, which it seems is limited , in the
sense that
it

* does not provide for Custom Formats ( like a TextPair or a Score that may
use basic Writables like Text,DoubleWritable etc )
* does not provide for type/name mapping ( the "AS" clause )
* does not provide for limiting the  inputs u may be interested in.

I want to use a Loader to provide for something like this

LOAD 'input' USING SequenceFileLoader AS (f1:chararray,2:chararray, f3:long,
f4:chararray, f5:chararray, f6:chararray,f7:double);

Now this is well and good and easy to write , if we have some standard
(Text,NullWritable ), Sequential File , with the Text having "," separated
columns ( almost a Pig Storage , but feeding off a Sequential File ).


In cases though , where we have a  Sequential File (
CustomWritableKey, CustomWritableValue ) where we still would like to
extract the raw types and aggregate on, the above fails , as the chararray,
int etc are limited to known types ( and I may be wrong here ).

What there fore I tried was to reduce the  CustomWritables to their raw
types , using a injectable Converter. This converter, takes the
CustomWritable ( key and value of a SequentialFile )  and returns the
 ArrayList<Object> that are the CustomWritable's reduced to their base types
and use the List returned to create the Tuple that has to be returned from
getNext().

I think this code is more likely to tell the tale better.

http://pastebin.com/QEwMztjU











On Mon, May 24, 2010 at 3:32 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> Vishal,
> I am not sure what your question is. Could you describe your goals and
> challenges before pasting in the implementation? It looks like the bottom
> part of your email, with all the comments, got malformatted, which may be
> the source of my confusion.
>
> Also, various services like pastebin and gist work better for code sharing,
> as they can take care of highlighting and things of that nature, which is
> handy for reviews.
>
> Thanks
> -Dmitriy
>
> On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
> <vi...@gmail.com>wrote:
>
> > I have this working , so  seeking validation and corrections.
> > We have SequentialFiles with various CustomWritables in hadoop and we
> want
> > to able to work with them from within pig
> >
> > I have taken PigStorage and the piggybank SequentialFileLoader as a
> > template
> >  and added pluggable converters that are fed through
> > the SequentialFileLoader ( which has a default ).
> > The below is part of the java file.
> >
> > public class SequenceFileLoader extends FileInputLoadFunc
> > implementsLoadPushDown{
> >
> >        public SequenceFileLoader() {
> >
> > converter = new TextConverter();
> >
> >  }
> >
> >  @SuppressWarnings("unchecked")
> >
> > public SequenceFileLoader(String customWritableToTupleBaseCoverter)
> > throwsFrontendException{
> >
> >  try {
> >
> > converter =
> >
> >
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
> >
> > } catch (Exception e) {
> >
> > throw new FrontendException(e);
> >
> > }
> >
> > }
> >
> >        @SuppressWarnings("unchecked")
> >
> > @Override
> >
> > public Tuple getNext() throws IOException {
> >
> > if (!mRequiredColumnsInitialized) {
> >
> > if (signature!=null) {
> >
> > Properties p =
> > UDFContext.getUDFContext().getUDFProperties(this.getClass());
> >
> > mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(
> > signature));
> >
> > }
> >
> > mRequiredColumnsInitialized = true;
> >
> > }
> >
> > boolean next = false;
> >
> > try {
> >
> > next = reader.nextKeyValue();
> >
> > } catch (InterruptedException e) {
> >
> > throw new IOException(e);
> >
> > }
> >
> >
> >  if (!next) return null;
> >
> >
> >  key = reader.getCurrentKey();
> >
> > value = reader.getCurrentValue();
> >
> > converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);
> >
> > Tuple t =  mTupleFactory.newTuple(mProtoTuple);
> >
> > mProtoTuple.clear();
> >
> > return t;
> >
> > }
> >
> >
> >
> > and
> >
> > public abstract class CustomWritableToTupleBaseConverter<K extends
> > Writable,
> > V extends Writable>{
> >
> >
> >   public abstract void populateTupleList(K time, V value, boolean[]
> > mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;
> >
> >
> > }
> >
> >
> >
> > Features * Allows for a Default Format (TextConverter) ** Text,
> > NullWritable
> > *** Text is treated as a COMMA(",") separated Text Array **** Consider a
> > Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
> > com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD
> 'input'
> > USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 ****
> grunt>
> > 3
> > * Allows for custom formats (example TimeWritableTestLongConverter) ** It
> > is
> > upto the Custom Converter to provide the SequenceFileLoader with the
> > Writables *** public abstract void populateTupleList(K time, V value,
> > boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> > IOException; in the base class CustomWritableToTupleBaseConverter. ***
> The
> > Custom Converter has to convert it's Key/Value ( as specified by the
> > SequenceFile ) into a List of Pig recognizable DataTypes **** grunt>
> DEFINE
> > SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); ****
> > grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
> > f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
> > f7:double);
> > **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told
> > as
> > to what is the type of the column , for it to do the right conversion. In
> > the above example is f7 is not defined as double, it will try to cast it
> > into an int , as we adding a 1 to the value. ** Note that the custom
> > converter is an argument defined in the DEFINE call. * Allows for
> limiting
> > the number of columns in the input ** grunt> A = LOAD 'input' USING
> > SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray,
> > f5:chararray, f6:chararray, f7:double);
> >
> >
> > Any issues any one sees in this approach?
> >
> > I have chosen the path of least resistance .. so any guidance will be
> > appreciated.
> >
>

Re: Creating a SequentialFileLoader

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Vishal,
I am not sure what your question is. Could you describe your goals and
challenges before pasting in the implementation? It looks like the bottom
part of your email, with all the comments, got malformatted, which may be
the source of my confusion.

Also, various services like pastebin and gist work better for code sharing,
as they can take care of highlighting and things of that nature, which is
handy for reviews.

Thanks
-Dmitriy

On Mon, May 24, 2010 at 9:41 AM, Vishal Santoshi
<vi...@gmail.com>wrote:

> I have this working , so  seeking validation and corrections.
> We have SequentialFiles with various CustomWritables in hadoop and we want
> to able to work with them from within pig
>
> I have taken PigStorage and the piggybank SequentialFileLoader as a
> template
>  and added pluggable converters that are fed through
> the SequentialFileLoader ( which has a default ).
> The below is part of the java file.
>
> public class SequenceFileLoader extends FileInputLoadFunc
> implementsLoadPushDown{
>
>        public SequenceFileLoader() {
>
> converter = new TextConverter();
>
>  }
>
>  @SuppressWarnings("unchecked")
>
> public SequenceFileLoader(String customWritableToTupleBaseCoverter)
> throwsFrontendException{
>
>  try {
>
> converter =
>
> (CustomWritableToTupleBaseConverter)Class.forName(customWritableToTupleBaseCoverter).newInstance();
>
> } catch (Exception e) {
>
> throw new FrontendException(e);
>
> }
>
> }
>
>        @SuppressWarnings("unchecked")
>
> @Override
>
> public Tuple getNext() throws IOException {
>
> if (!mRequiredColumnsInitialized) {
>
> if (signature!=null) {
>
> Properties p =
> UDFContext.getUDFContext().getUDFProperties(this.getClass());
>
> mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(
> signature));
>
> }
>
> mRequiredColumnsInitialized = true;
>
> }
>
> boolean next = false;
>
> try {
>
> next = reader.nextKeyValue();
>
> } catch (InterruptedException e) {
>
> throw new IOException(e);
>
> }
>
>
>  if (!next) return null;
>
>
>  key = reader.getCurrentKey();
>
> value = reader.getCurrentValue();
>
> converter.populateTupleList(key, value,mRequiredColumns,mProtoTuple);
>
> Tuple t =  mTupleFactory.newTuple(mProtoTuple);
>
> mProtoTuple.clear();
>
> return t;
>
> }
>
>
>
> and
>
> public abstract class CustomWritableToTupleBaseConverter<K extends
> Writable,
> V extends Writable>{
>
>
>   public abstract void populateTupleList(K time, V value, boolean[]
> mRequiredColumns, ArrayList<Object> mProtoTuple) throws IOException;
>
>
> }
>
>
>
> Features * Allows for a Default Format (TextConverter) ** Text,
> NullWritable
> *** Text is treated as a COMMA(",") separated Text Array **** Consider a
> Text with values as 1 , 2 , 3 **** grunt> DEFINE SequenceFileLoader
> com.medialets.hadoop.pig.SequenceFileLoader() **** grunt> A = LOAD 'input'
> USING SequenceFileLoader **** grunt> B = FOREACH A GENERATE $3 **** grunt>
> 3
> * Allows for custom formats (example TimeWritableTestLongConverter) ** It
> is
> upto the Custom Converter to provide the SequenceFileLoader with the
> Writables *** public abstract void populateTupleList(K time, V value,
> boolean[] mRequiredColumns, ArrayList<Object> mProtoTuple) throws
> IOException; in the base class CustomWritableToTupleBaseConverter. *** The
> Custom Converter has to convert it's Key/Value ( as specified by the
> SequenceFile ) into a List of Pig recognizable DataTypes **** grunt> DEFINE
> SequenceFileLoader a.b.c.SequenceFileLoader('a.b.b.SomeConverter'); ****
> grunt> A = LOAD 'input' USING SequenceFileLoader AS (f1:chararray,
> f2:chararray, f3:long, f4:chararray, f5:chararray, f6:chararray,
> f7:double);
> **** grunt> B = FILTER A BY f7 + 1 >.5; ** Note that , Pig has to be told
> as
> to what is the type of the column , for it to do the right conversion. In
> the above example is f7 is not defined as double, it will try to cast it
> into an int , as we adding a 1 to the value. ** Note that the custom
> converter is an argument defined in the DEFINE call. * Allows for limiting
> the number of columns in the input ** grunt> A = LOAD 'input' USING
> SequenceFileLoader AS (f1:chararray, f2:chararray, f3:long, f4:chararray,
> f5:chararray, f6:chararray, f7:double);
>
>
> Any issues any one sees in this approach?
>
> I have chosen the path of least resistance .. so any guidance will be
> appreciated.
>