You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Stuart White <st...@gmail.com> on 2009/03/18 18:26:50 UTC

Coordination between Mapper tasks

I'd like to implement some coordination between Mapper tasks running
on the same node.  I was thinking of using ZooKeeper to provide this
coordination.

I think I remember hearing that MapReduce and/or HDFS use ZooKeeper
under-the-covers.

So, I'm wondering... in my Mappers, if I want distributed
coordination, can I "piggy-back" onto the ZooKeeper instance being
used by the underlying MapRed/HDFS?  The benefit being that I don't
need to create/configure/run my own ZooKeeper instance.

Re: Coordination between Mapper tasks

Posted by Jimmy Lin <ji...@umd.edu>.
Hmmm... sounds odd. Given the same memcached servers (config), the 
hashing should be consistent.

FYI, all code for the experiments described in that tech report is in 
cloud9, the library I use for teaching my courses.  Download at:

http://www.umiacs.umd.edu/~jimmylin/

Hope this helps! (Let me know off list if you need more details)

-Jimmy

Stuart White wrote:
>> You might want to look at a memcached solution some students and I worked
>> out for exactly this problem.
> 
> Thanks, Jimmy!  This paper does exactly describe my problem.
> 
> I started working to implement the memcached solution you describe,
> and I've run into a small problem.  I've described it on the
> spymemcached forum:
> 
> http://groups.google.com/group/spymemcached/browse_thread/thread/7b4d82bca469ed20
> 
> Essentially, it seems the keys are being hashed inconsistently by
> spymemcached across runs.  This, of course, will result in
> inconsistent/invalid results.
> 
> Did you guys run into this?  Since I'm new to memcached, I'm hoping
> that this is simply something I don't understand or am overlooking.
> 

Re: Coordination between Mapper tasks

Posted by Stuart White <st...@gmail.com>.
> You might want to look at a memcached solution some students and I worked
> out for exactly this problem.

Thanks, Jimmy!  This paper does exactly describe my problem.

I started working to implement the memcached solution you describe,
and I've run into a small problem.  I've described it on the
spymemcached forum:

http://groups.google.com/group/spymemcached/browse_thread/thread/7b4d82bca469ed20

Essentially, it seems the keys are being hashed inconsistently by
spymemcached across runs.  This, of course, will result in
inconsistent/invalid results.

Did you guys run into this?  Since I'm new to memcached, I'm hoping
that this is simply something I don't understand or am overlooking.

Re: Coordination between Mapper tasks

Posted by Jimmy Lin <ji...@umd.edu>.
Hi Stuart,

You might want to look at a memcached solution some students and I 
worked out for exactly this problem.  It's written up in:

Jimmy Lin, Anand Bahety, Shravya Konda, and Samantha Mahindrakar. 
Low-Latency, High-Throughput Access to Static Global Resources within 
the Hadoop Framework. Technical Report HCIL-2009-01, University of 
Maryland, College Park, January 2009.

Available at:

http://www.umiacs.umd.edu/~jimmylin/publications/by_year.html

Best,
Jimmy

Stuart White wrote:
> Thanks to everyone for your feedback.  I'm unfamiliar with many of the
> technologies you've mentioned, so it may take me some time to digest
> all your responses.  The first thing I'm going to look at is Ted's
> suggestion of a pure map-reduce solution by pre-joining my data with
> my lookup values.
> 
> On Fri, Mar 20, 2009 at 9:55 AM, Owen O'Malley <ow...@gmail.com> wrote:
>> On Thu, Mar 19, 2009 at 6:42 PM, Stuart White <st...@gmail.com>wrote:
>>
>>> My process requires a large dictionary of terms (~ 2GB when loaded
>>> into RAM).  The terms are looked-up very frequently, so I want the
>>> terms memory-resident.
>>>
>>> So, the problem is, I want 3 processes (to utilize CPU), but each
>>> process requires ~2GB, but my nodes don't have enough memory to each
>>> have their own copy of the 2GB of data.  So, I need to somehow share
>>> the 2GB between the processes.
>>
>> I would recommend using the multi-threaded map runner. Have 1 map/node and
>> just use 3 worker threads that all consume the input. The only disadvantage
>> is that it works best for cpu-heavy loads (or maps that are doing crawling,
>> etc.), since you only have one record reader for all three of the map
>> threads.
>>
>> In the longer term, it might make sense to enable parallel jvm reuse in
>> addition to serial jvm reuse.
>>
>> -- Owen
>>
> 

Re: Coordination between Mapper tasks

Posted by Stuart White <st...@gmail.com>.
Thanks to everyone for your feedback.  I'm unfamiliar with many of the
technologies you've mentioned, so it may take me some time to digest
all your responses.  The first thing I'm going to look at is Ted's
suggestion of a pure map-reduce solution by pre-joining my data with
my lookup values.

On Fri, Mar 20, 2009 at 9:55 AM, Owen O'Malley <ow...@gmail.com> wrote:
> On Thu, Mar 19, 2009 at 6:42 PM, Stuart White <st...@gmail.com>wrote:
>
>>
>> My process requires a large dictionary of terms (~ 2GB when loaded
>> into RAM).  The terms are looked-up very frequently, so I want the
>> terms memory-resident.
>>
>> So, the problem is, I want 3 processes (to utilize CPU), but each
>> process requires ~2GB, but my nodes don't have enough memory to each
>> have their own copy of the 2GB of data.  So, I need to somehow share
>> the 2GB between the processes.
>
>
> I would recommend using the multi-threaded map runner. Have 1 map/node and
> just use 3 worker threads that all consume the input. The only disadvantage
> is that it works best for cpu-heavy loads (or maps that are doing crawling,
> etc.), since you only have one record reader for all three of the map
> threads.
>
> In the longer term, it might make sense to enable parallel jvm reuse in
> addition to serial jvm reuse.
>
> -- Owen
>

Re: Coordination between Mapper tasks

Posted by Owen O'Malley <ow...@gmail.com>.
On Thu, Mar 19, 2009 at 6:42 PM, Stuart White <st...@gmail.com>wrote:

>
> My process requires a large dictionary of terms (~ 2GB when loaded
> into RAM).  The terms are looked-up very frequently, so I want the
> terms memory-resident.
>
> So, the problem is, I want 3 processes (to utilize CPU), but each
> process requires ~2GB, but my nodes don't have enough memory to each
> have their own copy of the 2GB of data.  So, I need to somehow share
> the 2GB between the processes.


I would recommend using the multi-threaded map runner. Have 1 map/node and
just use 3 worker threads that all consume the input. The only disadvantage
is that it works best for cpu-heavy loads (or maps that are doing crawling,
etc.), since you only have one record reader for all three of the map
threads.

In the longer term, it might make sense to enable parallel jvm reuse in
addition to serial jvm reuse.

-- Owen

Re: Coordination between Mapper tasks

Posted by Steve Loughran <st...@apache.org>.
Stuart White wrote:
> The nodes in my cluster have 4 cores & 4 GB RAM.  So, I've set
> mapred.tasktracker.map.tasks.maximum to 3 (leaving 1 core for
> "breathing room").
> 
> My process requires a large dictionary of terms (~ 2GB when loaded
> into RAM).  The terms are looked-up very frequently, so I want the
> terms memory-resident.
> 
> So, the problem is, I want 3 processes (to utilize CPU), but each
> process requires ~2GB, but my nodes don't have enough memory to each
> have their own copy of the 2GB of data.  So, I need to somehow share
> the 2GB between the processes.
> 
> What I have currently implemented is a standalone RMI service that,
> during startup, loads the 2GB dictionaries.  My mappers are simply RMI
> clients that call this RMI service.
> 
> This works just fine.  The only problem is that my standalone RMI
> service is totally "outside" Hadoop.  I have to ssh onto each of the
> nodes, start/stop/reconfigure the services manually, etc...

There's nothing wrong with doing this outside Hadoop, the only problem 
is that manual deployment is not the way forward.

1. some kind of javaspace system where you put facts into the t-space 
and let them all share it

2. (CofI warning), use something like SmartFrog's anubis tuplespace to 
bring up one -and one only- node with the dictionary application. This 
may be hard to get started, but it keeps availability high -the anubis 
nodes keep track of all other members of the cluster by some 
heartbeat/election protocol, and can handle failures of the dictionary 
node by automatically bringing up a new one

3. Roll your own multicast/voting protocol, so avoiding RMI. Something 
scatter/gather style is needed as part of the Apache Cloud computing 
product portfolio, so you could try implementing it -Doug Cutting will 
probably provide constructive feedback.

I haven't played with zookeeper enough to say whether it would work here

-steve

Re: Coordination between Mapper tasks

Posted by Ted Dunning <te...@gmail.com>.
Aaron makes lots of sense when he says that there are better ways to do this
lookup without making your mappers depend on each other.

But having a hadoop cluster slam a mysql farm with queries is asking for
trouble (I have tried it).  Hadoop mappers can saturate a mysql database so
*very* hard that it is a thing to behold.

There are lots of other options.  The idea of using Zookeeper to spawn a
special lookup thread on each machine isn't so bad, although I would avoid
RMI like the plague, prefering Thrift or something similar.  Having the
program that launches the map-reduce program launch a lookup cluster isn't a
bad option either (but it isn't as simple as just starting the map-reduce
program).  Another option is to use a lookup system that depends on the file
system cache for memory residency of the lookup table.

I would strongly recommend exploring a pure map-reduce solution to the
problem.  Try joining your lookup table to your map data using a preliminary
map-reduce step.  This is very easily done if you have a single lookup per
map invocation.  If you have a number of lookups, then pass through your
data producing lookup keys each with pointers back to your original record
keys, pass through your lookup table generating key value pairs.  Reduce on
lookup key and emit original key + key/value pair from the lookup table.
Make sure you eliminate duplicates key/value pairs at this point.  Reduce
that against your original data and now you have your original data with all
of the data records that the mapper needs all in one place.  You are now set
to go with your original problem except the lookup operation has been done
ahead of time.

This sounds outrageously expensive, but because all disk I/O is sequential
it can be surprisingly fast even when the intermediate data steps are quite
large.

On Thu, Mar 19, 2009 at 8:46 PM, Aaron Kimball <aa...@cloudera.com> wrote:

>
> Are you using multiple machines for your processing? Rolling your own RMI
> service to provide data to your other system seems like asking for tricky
> bugs. Why not just put the dictionary terms into a mysql database? Your
> mappers could then select against this database, pulling in data
> incrementally, and discarding data they don't need. If you configured
> memcached (like Jim suggests), then you can even get some memory-based
> performance boosts too by sharing common reads.
>
> --
Ted Dunning, CTO
DeepDyve

Re: Coordination between Mapper tasks

Posted by Aaron Kimball <aa...@cloudera.com>.
Stuart,

Are you using multiple machines for your processing? Rolling your own RMI
service to provide data to your other system seems like asking for tricky
bugs. Why not just put the dictionary terms into a mysql database? Your
mappers could then select against this database, pulling in data
incrementally, and discarding data they don't need. If you configured
memcached (like Jim suggests), then you can even get some memory-based
performance boosts too by sharing common reads.

- Aaron

On Thu, Mar 19, 2009 at 8:27 PM, Jim Twensky <ji...@gmail.com> wrote:

> Stuart,
>
> Why do you use RMI to load your dictionary file? I presume you have (key,
> value) pairs and each of your mappers do numerous lookups to those pairs.
> In
> that case, using memcached may be a simpler option and again, you don't
> have
> to allocate a seperate 2 GB space for each of those 3 processes. Would this
> again be a problem for you to start/stop manually? If so, you may also
> consider using the distributed cache, like Aaron mentioned above. I've
> never
> tried using the distributed cache for files as large as 2GB's but it is
> still worth trying since it fits in your memory.
>
> Jim
>
> On Thu, Mar 19, 2009 at 8:42 PM, Stuart White <stuart.white1@gmail.com
> >wrote:
>
> > The nodes in my cluster have 4 cores & 4 GB RAM.  So, I've set
> > mapred.tasktracker.map.tasks.maximum to 3 (leaving 1 core for
> > "breathing room").
> >
> > My process requires a large dictionary of terms (~ 2GB when loaded
> > into RAM).  The terms are looked-up very frequently, so I want the
> > terms memory-resident.
> >
> > So, the problem is, I want 3 processes (to utilize CPU), but each
> > process requires ~2GB, but my nodes don't have enough memory to each
> > have their own copy of the 2GB of data.  So, I need to somehow share
> > the 2GB between the processes.
> >
> > What I have currently implemented is a standalone RMI service that,
> > during startup, loads the 2GB dictionaries.  My mappers are simply RMI
> > clients that call this RMI service.
> >
> > This works just fine.  The only problem is that my standalone RMI
> > service is totally "outside" Hadoop.  I have to ssh onto each of the
> > nodes, start/stop/reconfigure the services manually, etc...
> >
> > So, I was thinking that, at job startup, the processes on each node
> > would (using ZooKeeper) elect a leader responsible for hosting the 2GB
> > dictionaries.  This process would load the dictionaries and share them
> > via RMI.  The other processes would recognize that another process on
> > the box is the leader, and they would act as RMI clients to that
> > process.
> >
> > To make this work, I'm calling conf.setNumTasksToExecutePerJvm(-1) so
> > that Hadoop does not create new JVMs for each task.
> >
> > Also note that the processes are "grouped" by node; that is, the
> > ZooKeeper path that I'll use for coordination will include the
> > hostname, so that only processes on the same node will compete for
> > leadership.
> >
> > Anyway, in short, I was looking for a way to elect a leader process
> > per node responsible for hosting/sharing a large amount of
> > memory-resident data via RMI.
> >
> > Hopefully that made sense...
> >
>

Re: Coordination between Mapper tasks

Posted by Sean Shanny <ss...@tripadvisor.com>.
Stuart,

We use Hadoop in parts of our ETL processing for our data warehouse.   
We ran into a similar problem of needing to share about 60 million key  
value pairs (dimension keys) amongst the mapper jobs running in the  
final phase of our ETL process.  Our cluster is a small 3 machine 20  
core system.  We ended up setting up 2 nodes with memcached  both set  
at 4GB.  All 3 nodes use the cache via a java client using batch  
fetching (about 35 lookups per input record) and the system is  
performing well.  It has the added benefit of being scaleable.  Some  
of our keys can be long so we end up MD5 hashing them to work within  
the limits of memcached as well as consuming less space.  We flush the  
cache every day and it takes about 12 minutes to populated it.  I  
highly recommend it as something to explore.

--sean

On Mar 19, 2009, at 11:27 PM, Jim Twensky wrote:

> Stuart,
>
> Why do you use RMI to load your dictionary file? I presume you have  
> (key,
> value) pairs and each of your mappers do numerous lookups to those  
> pairs. In
> that case, using memcached may be a simpler option and again, you  
> don't have
> to allocate a seperate 2 GB space for each of those 3 processes.  
> Would this
> again be a problem for you to start/stop manually? If so, you may also
> consider using the distributed cache, like Aaron mentioned above.  
> I've never
> tried using the distributed cache for files as large as 2GB's but it  
> is
> still worth trying since it fits in your memory.
>
> Jim
>
> On Thu, Mar 19, 2009 at 8:42 PM, Stuart White  
> <st...@gmail.com>wrote:
>
>> The nodes in my cluster have 4 cores & 4 GB RAM.  So, I've set
>> mapred.tasktracker.map.tasks.maximum to 3 (leaving 1 core for
>> "breathing room").
>>
>> My process requires a large dictionary of terms (~ 2GB when loaded
>> into RAM).  The terms are looked-up very frequently, so I want the
>> terms memory-resident.
>>
>> So, the problem is, I want 3 processes (to utilize CPU), but each
>> process requires ~2GB, but my nodes don't have enough memory to each
>> have their own copy of the 2GB of data.  So, I need to somehow share
>> the 2GB between the processes.
>>
>> What I have currently implemented is a standalone RMI service that,
>> during startup, loads the 2GB dictionaries.  My mappers are simply  
>> RMI
>> clients that call this RMI service.
>>
>> This works just fine.  The only problem is that my standalone RMI
>> service is totally "outside" Hadoop.  I have to ssh onto each of the
>> nodes, start/stop/reconfigure the services manually, etc...
>>
>> So, I was thinking that, at job startup, the processes on each node
>> would (using ZooKeeper) elect a leader responsible for hosting the  
>> 2GB
>> dictionaries.  This process would load the dictionaries and share  
>> them
>> via RMI.  The other processes would recognize that another process on
>> the box is the leader, and they would act as RMI clients to that
>> process.
>>
>> To make this work, I'm calling conf.setNumTasksToExecutePerJvm(-1) so
>> that Hadoop does not create new JVMs for each task.
>>
>> Also note that the processes are "grouped" by node; that is, the
>> ZooKeeper path that I'll use for coordination will include the
>> hostname, so that only processes on the same node will compete for
>> leadership.
>>
>> Anyway, in short, I was looking for a way to elect a leader process
>> per node responsible for hosting/sharing a large amount of
>> memory-resident data via RMI.
>>
>> Hopefully that made sense...
>>


Re: Coordination between Mapper tasks

Posted by Jim Twensky <ji...@gmail.com>.
Stuart,

Why do you use RMI to load your dictionary file? I presume you have (key,
value) pairs and each of your mappers do numerous lookups to those pairs. In
that case, using memcached may be a simpler option and again, you don't have
to allocate a seperate 2 GB space for each of those 3 processes. Would this
again be a problem for you to start/stop manually? If so, you may also
consider using the distributed cache, like Aaron mentioned above. I've never
tried using the distributed cache for files as large as 2GB's but it is
still worth trying since it fits in your memory.

Jim

On Thu, Mar 19, 2009 at 8:42 PM, Stuart White <st...@gmail.com>wrote:

> The nodes in my cluster have 4 cores & 4 GB RAM.  So, I've set
> mapred.tasktracker.map.tasks.maximum to 3 (leaving 1 core for
> "breathing room").
>
> My process requires a large dictionary of terms (~ 2GB when loaded
> into RAM).  The terms are looked-up very frequently, so I want the
> terms memory-resident.
>
> So, the problem is, I want 3 processes (to utilize CPU), but each
> process requires ~2GB, but my nodes don't have enough memory to each
> have their own copy of the 2GB of data.  So, I need to somehow share
> the 2GB between the processes.
>
> What I have currently implemented is a standalone RMI service that,
> during startup, loads the 2GB dictionaries.  My mappers are simply RMI
> clients that call this RMI service.
>
> This works just fine.  The only problem is that my standalone RMI
> service is totally "outside" Hadoop.  I have to ssh onto each of the
> nodes, start/stop/reconfigure the services manually, etc...
>
> So, I was thinking that, at job startup, the processes on each node
> would (using ZooKeeper) elect a leader responsible for hosting the 2GB
> dictionaries.  This process would load the dictionaries and share them
> via RMI.  The other processes would recognize that another process on
> the box is the leader, and they would act as RMI clients to that
> process.
>
> To make this work, I'm calling conf.setNumTasksToExecutePerJvm(-1) so
> that Hadoop does not create new JVMs for each task.
>
> Also note that the processes are "grouped" by node; that is, the
> ZooKeeper path that I'll use for coordination will include the
> hostname, so that only processes on the same node will compete for
> leadership.
>
> Anyway, in short, I was looking for a way to elect a leader process
> per node responsible for hosting/sharing a large amount of
> memory-resident data via RMI.
>
> Hopefully that made sense...
>

Re: Coordination between Mapper tasks

Posted by Stuart White <st...@gmail.com>.
The nodes in my cluster have 4 cores & 4 GB RAM.  So, I've set
mapred.tasktracker.map.tasks.maximum to 3 (leaving 1 core for
"breathing room").

My process requires a large dictionary of terms (~ 2GB when loaded
into RAM).  The terms are looked-up very frequently, so I want the
terms memory-resident.

So, the problem is, I want 3 processes (to utilize CPU), but each
process requires ~2GB, but my nodes don't have enough memory to each
have their own copy of the 2GB of data.  So, I need to somehow share
the 2GB between the processes.

What I have currently implemented is a standalone RMI service that,
during startup, loads the 2GB dictionaries.  My mappers are simply RMI
clients that call this RMI service.

This works just fine.  The only problem is that my standalone RMI
service is totally "outside" Hadoop.  I have to ssh onto each of the
nodes, start/stop/reconfigure the services manually, etc...

So, I was thinking that, at job startup, the processes on each node
would (using ZooKeeper) elect a leader responsible for hosting the 2GB
dictionaries.  This process would load the dictionaries and share them
via RMI.  The other processes would recognize that another process on
the box is the leader, and they would act as RMI clients to that
process.

To make this work, I'm calling conf.setNumTasksToExecutePerJvm(-1) so
that Hadoop does not create new JVMs for each task.

Also note that the processes are "grouped" by node; that is, the
ZooKeeper path that I'll use for coordination will include the
hostname, so that only processes on the same node will compete for
leadership.

Anyway, in short, I was looking for a way to elect a leader process
per node responsible for hosting/sharing a large amount of
memory-resident data via RMI.

Hopefully that made sense...

Re: Coordination between Mapper tasks

Posted by Aaron Kimball <aa...@cloudera.com>.
A reasonable question is: are you sure you need coordination?

A lot of problems that look like they require coordination between mappers,
can actually be made to work more scalably (and much more simply!) by
decomposing them into two back-to-back MapReduce jobs, where some data is
exchanged /aggregated in the middle.

Also, you can send data to all the mappers ahead of time by writing out data
into files that are spread to all the mappers using the DistributedCache. So
if you have a relatively small amount of metadata that's known ahead of the
job, you could send that out in a DCache file to all the mappers.

Maybe if you could describe your goals more fully, we could offer some more
specific advice on your situation.

Cheers,
- Aaron


On Thu, Mar 19, 2009 at 10:03 AM, Owen O'Malley <om...@apache.org> wrote:

>
> On Mar 18, 2009, at 10:26 AM, Stuart White wrote:
>
>  I'd like to implement some coordination between Mapper tasks running
>> on the same node.  I was thinking of using ZooKeeper to provide this
>> coordination.
>>
>
> This is a very bad idea in the general case. It can be made to work, but
> you need to have a dedicated cluster so that you are sure they are all
> active at the same time. Otherwise, you have no guarantee that all of the
> maps are running at the same time.
>
> In most cases, you are much better off using the standard communication
> between the maps and reduces and making multiple passes of jobs.
>
>  I think I remember hearing that MapReduce and/or HDFS use ZooKeeper
>> under-the-covers.
>>
>
> There are no immediate plans to implement HA yet.
>
> -- Owen
>

Re: Coordination between Mapper tasks

Posted by Owen O'Malley <om...@apache.org>.
On Mar 18, 2009, at 10:26 AM, Stuart White wrote:

> I'd like to implement some coordination between Mapper tasks running
> on the same node.  I was thinking of using ZooKeeper to provide this
> coordination.

This is a very bad idea in the general case. It can be made to work,  
but you need to have a dedicated cluster so that you are sure they are  
all active at the same time. Otherwise, you have no guarantee that all  
of the maps are running at the same time.

In most cases, you are much better off using the standard  
communication between the maps and reduces and making multiple passes  
of jobs.

> I think I remember hearing that MapReduce and/or HDFS use ZooKeeper
> under-the-covers.

There are no immediate plans to implement HA yet.

-- Owen