You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Jonathan Coveney <jc...@gmail.com> on 2011/01/07 23:34:41 UTC

A custom loader which partitions on a key known to live on a given box?

I will implement this if I need to, but it seems to me that SOMEBODY has to
have run into this. I don't know if it's possible, but it's worth asking...

Basically I have a hadoop cluster of X servers, and one thing that I know is
that for anything with key k, all of the values associated with that key
will live on the same server. I've been told that the way to take advantage
of this is to make a custom loader which extends CollectibleLoader (I think,
it may be called something else), which then let's group operations be done
on the map side.

I know that Zebra implements this, but the cluster at hand is all flat
files, and getting away from that is not an option. Without a special file
format, is there a reasonable way to implement this? Has anyone done
something like this? I think having this in the piggybank or pigloader, if
it's possible, would be super useful for datasets like this.

Thanks for the help
Jon

Re: A custom loader which partitions on a key known to live on a given box?

Posted by Jonathan Coveney <jc...@gmail.com>.
That clarified things perfectly, Dmitriy. As always, super helpful. I think
I just need to look deeper into how the Zebra people did what they did, and
dig into how we're actually storing these things. I imagine if it was
trivial to implement partitions (and indexes for that matter) in pig it
would have been done...

2011/1/8 Dmitriy Ryaboy <dv...@gmail.com>

> What do you mean by data being on the same server?
>
> You said "I have a hadoop cluster of X servers, and one thing that I know
> is
> that for anything with key k, all of the values associated with that key
> will live on the same server."
>
> I am not sure how you achieve this.
>
> Assuming your data is in a file or collection of files in HDFS, the file is
> split up into blocks by the file system. Each block is semi-arbitrarily
> assigned to some node in your cluster (3 nodes, usually, for replication).
> You don't really control how these blocks are set up -- they are based on a
> configured block size setting in HDFS; it's usually 64 or 128 megabytes.
>
> When pig reads your file, it creates a separate map task for each of these
> blocks. If you can somehow guarantee that all the keys wind up in the same
> block, great, you can do what I recommended earlier; however, this is
> tricky
> -- even if your input file is sorted by the group key, and so all of your
> keys are next to each other, it is possible that a sequence of records with
> the same key happens to span the semi-arbitrary block boundary, and these
> records will wind up in separate map tasks.
>
> As I just explained it, I equated HDFS blocks with map tasks. That's an
> oversimplification. In fact map tasks are created per an InputSplit. What
> happens by default is that your InputFormat looks up the HDFS blocks and
> creates a split per HDFS block; but it could do something different,
> including guaranteeing that all keys are in the same split.  Zebra does
> something with its input format such that when you call
> ensureAllInstances...() on the Zebra Loader, the InputFormat is adjusted in
> a way that, well, ensures all keys are in the same split.
>
> Did that clarify things or confuse even more? :)
>
> D
>
> On Sat, Jan 8, 2011 at 3:31 PM, Jonathan Coveney <jc...@gmail.com>
> wrote:
>
> > I will admit my ignorance on this...what does it mean to be on the same
> > split? This is an area I am still getting up to speed on.
> >
> > As always, I appreciate the help.
> >
> > 2011/1/8 Dmitriy Ryaboy <dv...@gmail.com>
> >
> > > Same server or same split?
> > >
> > > I don't know how you can guarantee anything about all the data being on
> > the
> > > same server given that you are working with HDFS.
> > >
> > > If you mean same split, then you can do the following:
> > >
> > > public class MyStorage extends PigStorage implements
> CollectableLoadFunc
> > {
> > >
> > >  // add constructors here that mimic PigStorage constructors and just
> > call
> > > into super(args)
> > >  // don't forget the no-arg constructor
> > >
> > >  public void ensureAllKeyInstancesInSameSplit() throws IOException {
> > >    return;
> > >  }
> > > }
> > >
> > > As a side note -- I wish this method signature returned a boolean, and
> > > allowed a LoadFunc to decline, indicating that it can't ensure this
> > > condition, in which case Pig could either display a meaningful message
> to
> > > the user or default to a regular group-by. Ashutosh, thoughts?
> > >
> > > D
> > >
> > > On Fri, Jan 7, 2011 at 2:34 PM, Jonathan Coveney <jc...@gmail.com>
> > > wrote:
> > >
> > > > I will implement this if I need to, but it seems to me that SOMEBODY
> > has
> > > to
> > > > have run into this. I don't know if it's possible, but it's worth
> > > asking...
> > > >
> > > > Basically I have a hadoop cluster of X servers, and one thing that I
> > know
> > > > is
> > > > that for anything with key k, all of the values associated with that
> > key
> > > > will live on the same server. I've been told that the way to take
> > > advantage
> > > > of this is to make a custom loader which extends CollectibleLoader (I
> > > > think,
> > > > it may be called something else), which then let's group operations
> be
> > > done
> > > > on the map side.
> > > >
> > > > I know that Zebra implements this, but the cluster at hand is all
> flat
> > > > files, and getting away from that is not an option. Without a special
> > > file
> > > > format, is there a reasonable way to implement this? Has anyone done
> > > > something like this? I think having this in the piggybank or
> pigloader,
> > > if
> > > > it's possible, would be super useful for datasets like this.
> > > >
> > > > Thanks for the help
> > > > Jon
> > > >
> > >
> >
>

Re: A custom loader which partitions on a key known to live on a given box?

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
What do you mean by data being on the same server?

You said "I have a hadoop cluster of X servers, and one thing that I know is
that for anything with key k, all of the values associated with that key
will live on the same server."

I am not sure how you achieve this.

Assuming your data is in a file or collection of files in HDFS, the file is
split up into blocks by the file system. Each block is semi-arbitrarily
assigned to some node in your cluster (3 nodes, usually, for replication).
You don't really control how these blocks are set up -- they are based on a
configured block size setting in HDFS; it's usually 64 or 128 megabytes.

When pig reads your file, it creates a separate map task for each of these
blocks. If you can somehow guarantee that all the keys wind up in the same
block, great, you can do what I recommended earlier; however, this is tricky
-- even if your input file is sorted by the group key, and so all of your
keys are next to each other, it is possible that a sequence of records with
the same key happens to span the semi-arbitrary block boundary, and these
records will wind up in separate map tasks.

As I just explained it, I equated HDFS blocks with map tasks. That's an
oversimplification. In fact map tasks are created per an InputSplit. What
happens by default is that your InputFormat looks up the HDFS blocks and
creates a split per HDFS block; but it could do something different,
including guaranteeing that all keys are in the same split.  Zebra does
something with its input format such that when you call
ensureAllInstances...() on the Zebra Loader, the InputFormat is adjusted in
a way that, well, ensures all keys are in the same split.

Did that clarify things or confuse even more? :)

D

On Sat, Jan 8, 2011 at 3:31 PM, Jonathan Coveney <jc...@gmail.com> wrote:

> I will admit my ignorance on this...what does it mean to be on the same
> split? This is an area I am still getting up to speed on.
>
> As always, I appreciate the help.
>
> 2011/1/8 Dmitriy Ryaboy <dv...@gmail.com>
>
> > Same server or same split?
> >
> > I don't know how you can guarantee anything about all the data being on
> the
> > same server given that you are working with HDFS.
> >
> > If you mean same split, then you can do the following:
> >
> > public class MyStorage extends PigStorage implements CollectableLoadFunc
> {
> >
> >  // add constructors here that mimic PigStorage constructors and just
> call
> > into super(args)
> >  // don't forget the no-arg constructor
> >
> >  public void ensureAllKeyInstancesInSameSplit() throws IOException {
> >    return;
> >  }
> > }
> >
> > As a side note -- I wish this method signature returned a boolean, and
> > allowed a LoadFunc to decline, indicating that it can't ensure this
> > condition, in which case Pig could either display a meaningful message to
> > the user or default to a regular group-by. Ashutosh, thoughts?
> >
> > D
> >
> > On Fri, Jan 7, 2011 at 2:34 PM, Jonathan Coveney <jc...@gmail.com>
> > wrote:
> >
> > > I will implement this if I need to, but it seems to me that SOMEBODY
> has
> > to
> > > have run into this. I don't know if it's possible, but it's worth
> > asking...
> > >
> > > Basically I have a hadoop cluster of X servers, and one thing that I
> know
> > > is
> > > that for anything with key k, all of the values associated with that
> key
> > > will live on the same server. I've been told that the way to take
> > advantage
> > > of this is to make a custom loader which extends CollectibleLoader (I
> > > think,
> > > it may be called something else), which then let's group operations be
> > done
> > > on the map side.
> > >
> > > I know that Zebra implements this, but the cluster at hand is all flat
> > > files, and getting away from that is not an option. Without a special
> > file
> > > format, is there a reasonable way to implement this? Has anyone done
> > > something like this? I think having this in the piggybank or pigloader,
> > if
> > > it's possible, would be super useful for datasets like this.
> > >
> > > Thanks for the help
> > > Jon
> > >
> >
>

Re: A custom loader which partitions on a key known to live on a given box?

Posted by Jonathan Coveney <jc...@gmail.com>.
I will admit my ignorance on this...what does it mean to be on the same
split? This is an area I am still getting up to speed on.

As always, I appreciate the help.

2011/1/8 Dmitriy Ryaboy <dv...@gmail.com>

> Same server or same split?
>
> I don't know how you can guarantee anything about all the data being on the
> same server given that you are working with HDFS.
>
> If you mean same split, then you can do the following:
>
> public class MyStorage extends PigStorage implements CollectableLoadFunc {
>
>  // add constructors here that mimic PigStorage constructors and just call
> into super(args)
>  // don't forget the no-arg constructor
>
>  public void ensureAllKeyInstancesInSameSplit() throws IOException {
>    return;
>  }
> }
>
> As a side note -- I wish this method signature returned a boolean, and
> allowed a LoadFunc to decline, indicating that it can't ensure this
> condition, in which case Pig could either display a meaningful message to
> the user or default to a regular group-by. Ashutosh, thoughts?
>
> D
>
> On Fri, Jan 7, 2011 at 2:34 PM, Jonathan Coveney <jc...@gmail.com>
> wrote:
>
> > I will implement this if I need to, but it seems to me that SOMEBODY has
> to
> > have run into this. I don't know if it's possible, but it's worth
> asking...
> >
> > Basically I have a hadoop cluster of X servers, and one thing that I know
> > is
> > that for anything with key k, all of the values associated with that key
> > will live on the same server. I've been told that the way to take
> advantage
> > of this is to make a custom loader which extends CollectibleLoader (I
> > think,
> > it may be called something else), which then let's group operations be
> done
> > on the map side.
> >
> > I know that Zebra implements this, but the cluster at hand is all flat
> > files, and getting away from that is not an option. Without a special
> file
> > format, is there a reasonable way to implement this? Has anyone done
> > something like this? I think having this in the piggybank or pigloader,
> if
> > it's possible, would be super useful for datasets like this.
> >
> > Thanks for the help
> > Jon
> >
>

Re: A custom loader which partitions on a key known to live on a given box?

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Same server or same split?

I don't know how you can guarantee anything about all the data being on the
same server given that you are working with HDFS.

If you mean same split, then you can do the following:

public class MyStorage extends PigStorage implements CollectableLoadFunc {

  // add constructors here that mimic PigStorage constructors and just call
into super(args)
  // don't forget the no-arg constructor

  public void ensureAllKeyInstancesInSameSplit() throws IOException {
    return;
  }
}

As a side note -- I wish this method signature returned a boolean, and
allowed a LoadFunc to decline, indicating that it can't ensure this
condition, in which case Pig could either display a meaningful message to
the user or default to a regular group-by. Ashutosh, thoughts?

D

On Fri, Jan 7, 2011 at 2:34 PM, Jonathan Coveney <jc...@gmail.com> wrote:

> I will implement this if I need to, but it seems to me that SOMEBODY has to
> have run into this. I don't know if it's possible, but it's worth asking...
>
> Basically I have a hadoop cluster of X servers, and one thing that I know
> is
> that for anything with key k, all of the values associated with that key
> will live on the same server. I've been told that the way to take advantage
> of this is to make a custom loader which extends CollectibleLoader (I
> think,
> it may be called something else), which then let's group operations be done
> on the map side.
>
> I know that Zebra implements this, but the cluster at hand is all flat
> files, and getting away from that is not an option. Without a special file
> format, is there a reasonable way to implement this? Has anyone done
> something like this? I think having this in the piggybank or pigloader, if
> it's possible, would be super useful for datasets like this.
>
> Thanks for the help
> Jon
>