You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Markus Weimer <ma...@weimo.de> on 2010/01/28 20:00:12 UTC

Multiple data-local passes?

Hi,

I have a question about hadoop, which most likely someone in mahout
must have solved before:

Many online ML algorithms require multiple passes over data for best
performance. When putting these algorithms on hadoop, one would want
to run the code close to the data (same machine/rack). Mappers offer
this data-local execution but do not offer means to run multiple times
over the data. Of course, one could run the code outside of the hadoop
mapreduce framework as a HDFS client, but that does not offer the
data-locality advantage, in addition to not being scheduled through
the hadoop schedulers.

How is this solved in mahout?

Thanks for any pointer,

Markus

Re: Multiple data-local passes?

Posted by Markus Weimer <ma...@weimo.de>.
Hi,

> That is quite doable.  Typically, the way that you do this is to buffer the
> data either in memory or on local disk.  Both work fine.  You can munch on
> the data until the cows come home that way.  Hadoop will still schedule your
> tasks and handle failures for you.

Yes, that is an option. Unless the data does not fit onto the local
disk anymore. I could write it to a temp file on hdfs and hope that it
stays "close by" that way. Do you know whether hdfs makes any kind of
guarantees to keep data local to the place where it was created?

Thanks,

Markus

Re: Multiple data-local passes?

Posted by Ted Dunning <te...@gmail.com>.
That is quite doable.  Typically, the way that you do this is to buffer the
data either in memory or on local disk.  Both work fine.  You can munch on
the data until the cows come home that way.  Hadoop will still schedule your
tasks and handle failures for you.

The downside is that you lose communication between chunks of your data.
Sometimes that is fine.  Sometimes it isn't.  The specific case where it is
just fine is where you have multiple map functions that need to be applied
to individual input records.  These can trivially be smashed together into a
single map pass and that is just what frameworks like Pig and Cascading do.

This doesn't help you if you want to have lots of communication or global
summaries, but I think you know that.

On Thu, Jan 28, 2010 at 11:30 AM, Markus Weimer <ma...@weimo.de> wrote:

> In a way, I want a sequential program scheduled through hadoop. I will
> loose the parallelism, but I want to keep data locality, scheduling
> and restart-on-failure.
>



-- 
Ted Dunning, CTO
DeepDyve

Re: Multiple data-local passes?

Posted by Markus Weimer <ma...@weimo.de>.
Hi Ted,

> If you are running SGD on a single node, just open the HDFS files directly.
> You won't have significant benefit to locality unless the files are
> relatively small.

Good point. However, the applicability of it may depend on the network
topology of the cluster:

Reasonably fast implementations of SGD are bandwidth bound even when
reading from local disk on typical machines. Depending on the network
topology of the cluster, the rack-local bandwidth may be an order of
magnitude higher than the bandwidth you get when reading from a node
in another rack. So I believe there is value in data locality for SGD.

Your point is of course universally true for sequential algorithms
that are CPU-bound such as batch learning schemes.

Take care,

Markus

Re: Multiple data-local passes?

Posted by Markus Weimer <ma...@weimo.de>.
Hi Ted,

> The other hacky way is what I suggested... submit a normal job, but make all
> mappers wait until it is their turn.

Whether or not I get away with that one depends on how far the cluster
admins need to run before they get me ;-) Waiting mappers occupy slots
on the cluster which other users may want to use...

Take care,

Markus

Re: Multiple data-local passes?

Posted by Ted Dunning <te...@gmail.com>.
The other hacky way is what I suggested... submit a normal job, but make all
mappers wait until it is their turn.

Same effect.

On Thu, Jan 28, 2010 at 4:55 PM, Markus Weimer <ma...@weimo.de> wrote:

> A somewhat hacky way to implement this is to split the input file into
> files of exactly one block in size and then submitting one map job per
> file.
>



-- 
Ted Dunning, CTO
DeepDyve

Re: Multiple data-local passes?

Posted by Markus Weimer <ma...@weimo.de>.
Hi,

> No.  I was referring to the fact that the locality of any given large file
> is about the same from any node because there are many blocks and they get
> spattered all over.

Yes, that is true when referring to my comment of running the code on
one carefully chosen box. I agree that there is no such box that is
"closer" to the data on average.

What Jake & I were discussing is not to find this one box, but to use
multiple boxes for the learning, but sequentially: The code is sent to
the machine that holds the block of the data we are interested in, and
there is always one machine that fulfills this criterion. Once that
machine is done with learning, the algorithm & current model is
shipped to the machine which holds the next block of the file. That
way, the model gets shipped through the network, but not the data.

A somewhat hacky way to implement this is to split the input file into
files of exactly one block in size and then submitting one map job per
file.

Markus

Re: Multiple data-local passes?

Posted by Ted Dunning <te...@gmail.com>.
No.  I was referring to the fact that the locality of any given large file
is about the same from any node because there are many blocks and they get
spattered all over.

Only with a small file is this rough and random symmetry broken.  For files
< 1 block, there are typically three privileged nodes from the standpoint of
locality.

Another approach would be to set the replication on the SGD input somewhat
higher than normal.  This would make many nodes have lots of local blocks
but wouldn't change the fact that nodes are all, on average, the same with
respect to locality.

On Thu, Jan 28, 2010 at 3:17 PM, Jake Mannix <ja...@gmail.com> wrote:

> > If you are running SGD on a single node, just open the HDFS files
> directly.
> > You won't have significant benefit to locality unless the files are
> > relatively small.
> >
>
> You mean relatively large, right?




-- 
Ted Dunning, CTO
DeepDyve

Re: Multiple data-local passes?

Posted by Ted Dunning <te...@gmail.com>.
Jake, I think you have a good idea here.

I wonder if this effect could be achieved by having each mapper simply block
until it gets the word from some coordination layer that it is time for it
to read the current state of the SGD from a shared store somewhere.
Something like ZK would make a fine coordination layer.  Handing the state
around efficiently will take some thought, though.  Putting it in HDFS is
probably a bit slow.  Sending it directly to the next worker seems fragile.
If it is relatively small then putting it in ZK might even work.


On Thu, Jan 28, 2010 at 3:17 PM, Jake Mannix <ja...@gmail.com> wrote:

> Hadoop isn't doing real parallism via this approach, but is sending
> your process to where your data is, which is a lot better than opening
> up a hook into one big HDFS stream and slurping down the entire set
> locally, I'd imagine, given that he says that network latency is the
> bottleneck when he streams data.
>



-- 
Ted Dunning, CTO
DeepDyve

Re: Multiple data-local passes?

Posted by Jake Mannix <ja...@gmail.com>.
On Thu, Jan 28, 2010 at 3:01 PM, Ted Dunning <te...@gmail.com> wrote:

> Aha....
>
> If you are running SGD on a single node, just open the HDFS files directly.
> You won't have significant benefit to locality unless the files are
> relatively small.
>

You mean relatively large, right?


> With a single node solution, you gain little from Hadoop.  The need for
> restarts and such really provide large advantage when you have many nodes
> participating in the computation.
>

If he's got an N-node cluster, and each 1/N's worth of his data set takes
more than maybe 5-10 minutes to process per pass, then the overhead
would be fairly minimal.  Of course, knowing how fast SGD is, there'd
need to be a lot of data to take 10 minutes to process only a fraction of
a single pass through...

Hadoop isn't doing real parallism via this approach, but is sending
your process to where your data is, which is a lot better than opening
up a hook into one big HDFS stream and slurping down the entire set
locally, I'd imagine, given that he says that network latency is the
bottleneck when he streams data.

  -jake

Re: Multiple data-local passes?

Posted by Ted Dunning <te...@gmail.com>.
Aha....

If you are running SGD on a single node, just open the HDFS files directly.
You won't have significant benefit to locality unless the files are
relatively small.

With a single node solution, you gain little from Hadoop.  The need for
restarts and such really provide large advantage when you have many nodes
participating in the computation.

On Thu, Jan 28, 2010 at 1:37 PM, Markus Weimer <ma...@weimo.de> wrote:

> It would be
> neat if the actual learning could be done on the cluster as well, if
> only on a single, carefully chosen node close to the data.
>



-- 
Ted Dunning, CTO
DeepDyve

Re: Multiple data-local passes?

Posted by Jake Mannix <ja...@gmail.com>.
On Thu, Jan 28, 2010 at 2:06 PM, Markus Weimer <ma...@weimo.de> wrote:

> Hi Jake
>
> > Well let me see what we would imagine is going on: your
> > data lives all over HDFS, because it's nice and big.  The
> > algorithm wants to run over the set in a big streamy fashion.
>
> Yes, that sums it up nicely. The important part is to be able to
> stream over the data several times.
>

Yeah, I recently committed a stream-oriented SVD impl which
ideally would do the same thing - it's non-parallelizable, but
fast on a streaming set (although I'm not sure how long it
would take to converge on a set which was so big that it
didn't fit on a single box... that's mighty big for a non-parallelized
algorithm...)


> > You clearly don't want to move your multi-TB dataset
> > around, but moving the 0.5GB model state around is
> > ok, yes?
>
> Yes! That is an awesome idea to minimize the I/O on the system.
> However, it does not yet address the issue of multiple passes over the
> data. But that can easily be done by handing around the 0.5GB another
> time.


> I could store the 0.5GB on HDFS, where it is read from the mapper in
> setup(), updated in map() over the data and stored again in cleanup(),
> together with some housekeeping data about which InputSplit was
> processed. The driver program could then orchestrate the different
> mappers and manage the global state of this procedure. Now I only need
> to figure out how to do this ;-)
>

Yeah, the nice thing about this hack is that you could reorder the shards
sometimes too, on any given pass, if order of input points wasn't
important (why would you want to do this?  maybe the node which has
some of your data is heavily loaded by someone else's single-point-of-
failure reducer, so you instead save that shard for later.  Not sure how
you do that on Hadoop, but it would be cool!).

Since we've got some SGD coming down the pipe in Mahout, in
particular, we'd love to hear how you end up going with this!

  -jake

Re: Multiple data-local passes?

Posted by Markus Weimer <ma...@weimo.de>.
Hi Jake

> Well let me see what we would imagine is going on: your
> data lives all over HDFS, because it's nice and big.  The
> algorithm wants to run over the set in a big streamy fashion.

Yes, that sums it up nicely. The important part is to be able to
stream over the data several times.

> At any given point if it's done processing local stuff, it can
> output 0.5GB of state and pick up that somewhere else to
> continue, is that correct?

Yes, with some mild constraints on "somewhere" that could be handled
by writing my own InputFormat and InputSplit classes.

> You clearly don't want to move your multi-TB dataset
> around, but moving the 0.5GB model state around is
> ok, yes?

Yes! That is an awesome idea to minimize the I/O on the system.
However, it does not yet address the issue of multiple passes over the
data. But that can easily be done by handing around the 0.5GB another
time.

I could store the 0.5GB on HDFS, where it is read from the mapper in
setup(), updated in map() over the data and stored again in cleanup(),
together with some housekeeping data about which InputSplit was
processed. The driver program could then orchestrate the different
mappers and manage the global state of this procedure. Now I only need
to figure out how to do this ;-)

Thanks again,

Markus

Re: Multiple data-local passes?

Posted by Jake Mannix <ja...@gmail.com>.
On Thu, Jan 28, 2010 at 1:37 PM, Markus Weimer <ma...@weimo.de> wrote:
>
> >
> > How does network bandwidth come into play in a "local" solution?
>
> Data may not fit on one disk and must be streamed through the network
> to the learning algorithm. If the data does indeed fit onto one disk,
> the algorithm becomes disk bandwidth bound.
>

Ok, understand this part now, ok.


> There is no parallelism to be exploited: I'm doing SGD-style learning.
> As the parallelization thereof is a largely unsolved problem, the
> learning is strictly sequential.  The desire to run it on a hadoop
> cluster stems from the fact that data preprocessing and the
> application of the learned model is a perfect fit for it. It would be
> neat if the actual learning could be done on the cluster as well, if
> only on a single, carefully chosen node close to the data.
>

Well let me see what we would imagine is going on: your
data lives all over HDFS, because it's nice and big.  The
algorithm wants to run over the set in a big streamy fashion.

At any given point if it's done processing local stuff, it can
output 0.5GB of state and pick up that somewhere else to
continue, is that correct?

You clearly don't want to move your multi-TB dataset
around, but moving the 0.5GB model state around is
ok, yes?

It seems like what you'd want to do is pass that state
info around your cluster, sequentially using one node
at a time to process chunks of your data set, I'm just
not sure what sort of non-hacky way there is to do this
in Hadoop.  Simple hack: split up your set manually
into a bunch of smaller (small enough for one disk)
non-splittable files, and then have the same job get
repeated over and over again (with different input
sources), each time it finishes it outputs state to
HDFS, and each time it starts, the mapper slurps down
the state from HDFS.  This latter mini-shuffle is a
little inefficient (probably two remote copies are
done), but it's a fairly small amount of data that
is being transferred, and hopefully IO would no longer
be the bottleneck.

  -jake




> Thanks,
>
> Markus
>

Re: Multiple data-local passes?

Posted by Markus Weimer <ma...@weimo.de>.
Hi,

>> And that's what I'd
>> like to avoid. Current ("local") solutions are usually limited by the
>> network bandwidth, and hadoop offers some relief on that.
>>
>
> How does network bandwidth come into play in a "local" solution?

Data may not fit on one disk and must be streamed through the network
to the learning algorithm. If the data does indeed fit onto one disk,
the algorithm becomes disk bandwidth bound.

>> In a way, I want a sequential program scheduled through hadoop. I will
>> loose the parallelism, but I want to keep data locality, scheduling
>> and restart-on-failure.
>
>
> You're still doing things partially parallelized, right?  Because your
> input
> data set is large enough to need to be split between machines, and your
> algorithm can work on each chunk independently?  Or is this not
> the case?

There is no parallelism to be exploited: I'm doing SGD-style learning.
As the parallelization thereof is a largely unsolved problem, the
learning is strictly sequential.  The desire to run it on a hadoop
cluster stems from the fact that data preprocessing and the
application of the learned model is a perfect fit for it. It would be
neat if the actual learning could be done on the cluster as well, if
only on a single, carefully chosen node close to the data.

Thanks,

Markus

Re: Multiple data-local passes?

Posted by Jake Mannix <ja...@gmail.com>.
On Thu, Jan 28, 2010 at 11:30 AM, Markus Weimer <ma...@weimo.de> wrote:

> Hi Jake,
>
> >  Are you saying you want something more sophisticated than just setting
> > your number of reducers equal to zero and then repeatedly running your
> > Map(minus Reduce) job on Hadoop?  The Mappers will go where the
> > data is, as you say, but if your mapper output then needs to be collected
> > in some place and aggregated, you'll need the shuffle + Reduce step.
>
> Yes, I'd like to be more sophisticated than that. Assume that the
> output of each mapper is 512MB of doubles. Then, writing these to hdfs
> and shuffing, reducing & re-reading them in the next pass easily
> dominates the overall runtime of the algorithm.


So trying to think more specifically  - your output from the mapper - is
this
0.5GB set of doubles needed by any other mapper in subsequent iterations
of your algorithm?  If it's only needed by the data local to the current
node,
you can do as Ted says and write to local disk, reading it back up on the
next pass.


> And that's what I'd
> like to avoid. Current ("local") solutions are usually limited by the
> network bandwidth, and hadoop offers some relief on that.
>

How does network bandwidth come into play in a "local" solution?


> In a way, I want a sequential program scheduled through hadoop. I will
> loose the parallelism, but I want to keep data locality, scheduling
> and restart-on-failure.


You're still doing things partially parallelized, right?  Because your
input
data set is large enough to need to be split between machines, and your
algorithm can work on each chunk independently?  Or is this not
the case?

   -jake

Re: Multiple data-local passes?

Posted by Markus Weimer <ma...@weimo.de>.
Hi Jake,

>  Are you saying you want something more sophisticated than just setting
> your number of reducers equal to zero and then repeatedly running your
> Map(minus Reduce) job on Hadoop?  The Mappers will go where the
> data is, as you say, but if your mapper output then needs to be collected
> in some place and aggregated, you'll need the shuffle + Reduce step.

Yes, I'd like to be more sophisticated than that. Assume that the
output of each mapper is 512MB of doubles. Then, writing these to hdfs
and shuffing, reducing & re-reading them in the next pass easily
dominates the overall runtime of the algorithm. And that's what I'd
like to avoid. Current ("local") solutions are usually limited by the
network bandwidth, and hadoop offers some relief on that.

In a way, I want a sequential program scheduled through hadoop. I will
loose the parallelism, but I want to keep data locality, scheduling
and restart-on-failure.

Thanks,

Markus

Re: Multiple data-local passes?

Posted by Jake Mannix <ja...@gmail.com>.
Hi Markus,

  Are you saying you want something more sophisticated than just setting
your number of reducers equal to zero and then repeatedly running your
Map(minus Reduce) job on Hadoop?  The Mappers will go where the
data is, as you say, but if your mapper output then needs to be collected
in some place and aggregated, you'll need the shuffle + Reduce step.

If your mapper output is very small (you're reading in the data set
multiple times, but training up a very small model), then you might
need the reducers and can transmit the model via side channel methods
to all the nodes for the next pass.

Is this the kind of thing you're talking about?

  -jake

On Thu, Jan 28, 2010 at 11:00 AM, Markus Weimer
<ma...@weimo.de>wrote:

> Hi,
>
> I have a question about hadoop, which most likely someone in mahout
> must have solved before:
>
> Many online ML algorithms require multiple passes over data for best
> performance. When putting these algorithms on hadoop, one would want
> to run the code close to the data (same machine/rack). Mappers offer
> this data-local execution but do not offer means to run multiple times
> over the data. Of course, one could run the code outside of the hadoop
> mapreduce framework as a HDFS client, but that does not offer the
> data-locality advantage, in addition to not being scheduled through
> the hadoop schedulers.
>
> How is this solved in mahout?
>
> Thanks for any pointer,
>
> Markus
>

Re: Multiple data-local passes?

Posted by Robin Anil <ro...@gmail.com>.
Another algo where this came in to picture was PFPgrowth. Where in the
reducer. I needed 2 pass over the data first to count frequencies and second
to build the graph using that information. There I kept the data in memory
in a compressed form and reused it and ensured a reduce chunk doesnt get
much data that will cause it to go out of memory. But chances are it will on
some long chain data. The algorithm wasnt designed for long chained
transactions anyway

Robin

On Fri, Jan 29, 2010 at 12:41 AM, Robin Anil <ro...@gmail.com> wrote:

> Glad that you asked because I have been asking the same question myself
> when creating a Text->Vector convertor where i need to iterate over the same
> data converting them to vectors using a chunk of dictionary at a time. If i
> had the option of running multiple passes. It would have taken me just a
> single mapreduce. Here i have to do 1 pass over the data for every chunk of
> dictionary in memory.  True, I can run n sequential job using a HDFS client
> on different servers. The network data transfer  wasn't worth it.
>
> Robin
>
> On Fri, Jan 29, 2010 at 12:30 AM, Markus Weimer <mailinglists2008@weimo.de
> > wrote:
>
>> Hi,
>>
>> I have a question about hadoop, which most likely someone in mahout
>> must have solved before:
>>
>> Many online ML algorithms require multiple passes over data for best
>> performance. When putting these algorithms on hadoop, one would want
>> to run the code close to the data (same machine/rack). Mappers offer
>> this data-local execution but do not offer means to run multiple times
>> over the data. Of course, one could run the code outside of the hadoop
>> mapreduce framework as a HDFS client, but that does not offer the
>> data-locality advantage, in addition to not being scheduled through
>> the hadoop schedulers.
>>
>> How is this solved in mahout?
>>
>> Thanks for any pointer,
>>
>> Markus
>>
>
>
>
> --
> ------
> Robin Anil
> Blog: http://techdigger.wordpress.com
> -------
>
> Mahout in Action - Mammoth Scale machine learning
> Read Chapter 1 - Its Frrreeee
> http://www.manning.com/owen
>
> Try out Swipeball for iPhone
> http://itunes.com/apps/swipeball
>

Re: Multiple data-local passes?

Posted by Robin Anil <ro...@gmail.com>.
Glad that you asked because I have been asking the same question myself when
creating a Text->Vector convertor where i need to iterate over the same data
converting them to vectors using a chunk of dictionary at a time. If i had
the option of running multiple passes. It would have taken me just a single
mapreduce. Here i have to do 1 pass over the data for every chunk of
dictionary in memory.  True, I can run n sequential job using a HDFS client
on different servers. The network data transfer  wasn't worth it.

Robin

On Fri, Jan 29, 2010 at 12:30 AM, Markus Weimer
<ma...@weimo.de>wrote:

> Hi,
>
> I have a question about hadoop, which most likely someone in mahout
> must have solved before:
>
> Many online ML algorithms require multiple passes over data for best
> performance. When putting these algorithms on hadoop, one would want
> to run the code close to the data (same machine/rack). Mappers offer
> this data-local execution but do not offer means to run multiple times
> over the data. Of course, one could run the code outside of the hadoop
> mapreduce framework as a HDFS client, but that does not offer the
> data-locality advantage, in addition to not being scheduled through
> the hadoop schedulers.
>
> How is this solved in mahout?
>
> Thanks for any pointer,
>
> Markus
>



-- 
------
Robin Anil
Blog: http://techdigger.wordpress.com
-------

Mahout in Action - Mammoth Scale machine learning
Read Chapter 1 - Its Frrreeee
http://www.manning.com/owen

Try out Swipeball for iPhone
http://itunes.com/apps/swipeball