You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Ted Dunning <td...@veoh.com> on 2007/08/22 19:55:38 UTC

Poly-reduce?

I am finding that it is a common pattern that multi-phase map-reduce
programs I need to write very often have nearly degenerate map functions in
second and later map-reduce phases.  The only need for these function is to
select the next reduce key and very often, a local combiner can be used to
greatly decrease the number of records passed to the second reduce.

It isn't hard to implement these programs as multiple fully fledged
map-reduces, but it appears to me that many of them would be better
expressed as something more like a map-reduce-reduce program.

For example, take the problem of coocurrence counting in log records.  The
first map would extract a user id and an object id and group on user id.
The second reduce would take entire sessions for a single user and generate
co-occurrence pairs as keys for the second reduce, each with a count
determined by the frequency of the objects in the user history.  The second
reduce (and local combiner) would aggregate these counts and discard items
with small counts. 

Expressed conventionally, this would have write all of the user sessions to
HDFS and a second map phase would generate the pairs for counting.  The
opportunity for efficiency would come from the ability to avoid writing
intermediate results to the distributed data store.
    
Has anybody looked at whether this would help and whether it would be hard
to do?



Re: Poly-reduce?

Posted by Owen O'Malley <oo...@yahoo-inc.com>.
On Aug 22, 2007, at 10:55 AM, Ted Dunning wrote:

> I am finding that it is a common pattern that multi-phase map-reduce
> programs I need to write very often have nearly degenerate map  
> functions in
> second and later map-reduce phases.  The only need for these  
> function is to
> select the next reduce key and very often, a local combiner can be  
> used to
> greatly decrease the number of records passed to the second reduce.

My opinion is that handling these kinds of patterns in the framework  
itself is a mistake. It would introduce a lot of complexity and the  
payback would be relatively slight in terms of the application. I'd  
much rather have the Hadoop framework support the single primitive  
(map/reduce) very well and build a layer on top that provides a very  
general algebra over map/reduce operations. One early example of this  
is Pig (http://research.yahoo.com/project/pig).

-- Owen

Re: Poly-reduce?

Posted by Eric Baldeschwieler <er...@yahoo-inc.com>.
especially at scale!

And we are testing on >1000 node clusters with long jobs.  We see  
lots of failures per job.

On Aug 24, 2007, at 4:20 PM, Ted Dunning wrote:

>
>
>
> On 8/24/07 12:11 PM, "Doug Cutting" <cu...@apache.org> wrote:
>
> >
> >> Using the same logic, streaming reduce outputs to
> >> the next map and reduce steps (before the first reduce is complete)
> >> should also provide speedup.
> >
> > Perhaps, but the bookeeping required in the jobtracker might be  
> onerous.
> >   The failure modes are more complex, complicating recovery.
>
> Frankly, I find Doug's arguments about reliability fairly compelling.
>
> Map-reduce^n is not the same, nor is it entirely analogous to pipe- 
> style
> programming.  It feels the same, but there are very important  
> differences
> that I wasn't thinking of when I made this suggestion.  The most  
> important
> is the issue of reliability.  In a large cluster, failure is a  
> continuous
> process, not an isolated event.  As such, the problems of having to  
> roll
> back an entire program due to node failure are not something that  
> can be
> treated as unusual.  That makes Doug's comments about risk more on- 
> point
> than the potential gains.  It is very easy to imagine scenarios  
> where the
> possibility of program roll-back results in very large average run- 
> times
> while the chained reduce results in only incremental savings.  This  
> isn't a
> good thing to bet on.
>
>


Re: Poly-reduce?

Posted by Ted Dunning <td...@veoh.com>.


On 8/24/07 12:11 PM, "Doug Cutting" <cu...@apache.org> wrote:

> 
>> Using the same logic, streaming reduce outputs to
>> the next map and reduce steps (before the first reduce is complete)
>> should also provide speedup.
> 
> Perhaps, but the bookeeping required in the jobtracker might be onerous.
>   The failure modes are more complex, complicating recovery.

Frankly, I find Doug's arguments about reliability fairly compelling.

Map-reduce^n is not the same, nor is it entirely analogous to pipe-style
programming.  It feels the same, but there are very important differences
that I wasn't thinking of when I made this suggestion.  The most important
is the issue of reliability.  In a large cluster, failure is a continuous
process, not an isolated event.  As such, the problems of having to roll
back an entire program due to node failure are not something that can be
treated as unusual.  That makes Doug's comments about risk more on-point
than the potential gains.  It is very easy to imagine scenarios where the
possibility of program roll-back results in very large average run-times
while the chained reduce results in only incremental savings.  This isn't a
good thing to bet on.



Re: Poly-reduce?

Posted by Doug Cutting <cu...@apache.org>.
Joydeep Sen Sarma wrote:
> Would be cool to get an option to reduce replication factor for reduce
> outputs.

This should be as simple as setting dfs.replication on a job.  If that 
does not work, it's a bug.

> Hard to buy the argument that there's gonna be no performance win with
> direct streaming between jobs. Currently reduce jobs start reading map
> outputs before all maps are complete - and I am sure this results in
> significant speedup. 

That's not exactly true.  No reduces can be performed until all maps are 
complete.  However the shuffle (transfer/sort of intermediate data) 
happens in parallel with mapping.

> Using the same logic, streaming reduce outputs to
> the next map and reduce steps (before the first reduce is complete)
> should also provide speedup. 

Perhaps, but the bookeeping required in the jobtracker might be onerous. 
  The failure modes are more complex, complicating recovery.

> If the streaming option were available, the programmer would have a
> clear choice: excellent best case/poor worst case performance with
> streaming or good best case/good worst case performance with hdfs based
> checkpointing. I think this is a choice that the job-writer is competent
> enough to make.

Please feel free to try to implement this.  If you can develop a patch 
which implements this reliably in maintainable code, then it would 
probably be committed.

Doug

RE: Poly-reduce?

Posted by Joydeep Sen Sarma <js...@facebook.com>.
Would be cool to get an option to reduce replication factor for reduce
outputs.

Hard to buy the argument that there's gonna be no performance win with
direct streaming between jobs. Currently reduce jobs start reading map
outputs before all maps are complete - and I am sure this results in
significant speedup. Using the same logic, streaming reduce outputs to
the next map and reduce steps (before the first reduce is complete)
should also provide speedup. 

If the streaming option were available, the programmer would have a
clear choice: excellent best case/poor worst case performance with
streaming or good best case/good worst case performance with hdfs based
checkpointing. I think this is a choice that the job-writer is competent
enough to make.


To Owen's reference to PIG - I am curious whether the PIG codebase also
frequently chains multiple map-reduce jobs to perform a single Pig
operation? (especially since my experience resulted from the need to
write some complicated multi-way joins). Anyone from Pig developer
community who can chime in?

Joydeep




-----Original Message-----
From: Doug Cutting [mailto:cutting@apache.org] 
Sent: Friday, August 24, 2007 9:54 AM
To: hadoop-user@lucene.apache.org
Subject: Re: Poly-reduce?

Ted Dunning wrote:
> It isn't hard to implement these programs as multiple fully fledged
> map-reduces, but it appears to me that many of them would be better
> expressed as something more like a map-reduce-reduce program.
> 
> [ ... ]
> 
> Expressed conventionally, this would have write all of the user
sessions to
> HDFS and a second map phase would generate the pairs for counting.
The
> opportunity for efficiency would come from the ability to avoid
writing
> intermediate results to the distributed data store.
>     
> Has anybody looked at whether this would help and whether it would be
hard
> to do?

It would job tracker more complicated, and might not help job execution 
time that much.

Consider implementing this as multiple map reduce steps, but using a 
replication level of one for intermediate data.  That would mostly have 
the performance characteristics you want.  But if a node died, things 
could not intelligently automatically re-create just the missing data. 
Instead the application would have to re-run the entire job, or subsets 
of it, in order to re-create the un-replicated data.

Under poly-reduce, if a node failed, all tasks that were incomplete on 
that node would need to be restarted.  But first, their input data would

need to be located.  If you saved all intermediate data in the course of

a job (which would be expensive) then the inputs that need re-creation 
would mostly just be those that were created on the failed node.  But 
this failure would generally cascade all the way back to the initial map

stage.  So a single machine failure in the last phase could double the 
run time of the job, with most of the cluster idle.

If, instead, you used normal mapreduce, with intermediate data 
replicated in the filesystem, a single machine failure in the last phase

would only require re-running tasks from the last job.

Perhaps, when chaining mapreduces, one should use a lower replication 
level for intermediate data, like two.  Additionally, one might wish to 
relax the one-replica-off-rack criterion for such files, so that 
replication is faster, and since whole-rack failures are rare.  This 
might give good chained performance, but keep machine failures from 
knocking tasks back to the start of the chain.  Currently its not 
possible to disable the one-replica-off-rack preference, but that might 
be a reasonable feature request.

Doug


Re: Poly-reduce?

Posted by Vuk Ercegovac <ve...@us.ibm.com>.
We found Map/Reduce job composition to be useful when intermediate result
sizes were comparable or larger than the size of their input.

We implemented a prototype where intermediate Map/Reduce steps are composed
into  a single task. Pictorially, we transform a sequence of Map and Reduce
jobs:
    M -> R, M -> R, M -> R      // default behavior
into:
    M -> R|M -> R|M -> R        // with composition

Here,  “R|M”  denotes the composition of a Reduce (R) with a subsequent Map
(M),  allowing  the  output  of  R  to  be  directly  consumed by M. In our
prototype,  the  programmer  provides  the  same  Map and Reduce classes as
before,  as well as the sequence of Map/Reduce jobs to run. Then, under the
covers,  the system takes care of composing the intermediate Map and Reduce
steps.

To  illustrate how this can improve performance, denote a read and write to
HDFS  as  “rh”  and  “wh”, respectively, and denote a read and write to the
local  file  system  read  as  “rl”,  and  “wl”.  Finally, denote a network
transfer  by  “n”.  Then,  in  terms of disk and network I/O, the two cases
above can be described in more detail as:
    M(rh,  wl,  n)  ->  R(wl, rl, wh),  M(rh, wl, n) -> R(wl, rl, wh)    //
default behavior
verses:
    M(rh, wl, n) -> R|M(wl, rl, wl, n) -> R(wl, rl, wh)                  //
with composition

Comparing  the  two  cases,  there  are  two  key points to notice: 1) Each
composed  R|M  task eliminates an HDFS read and write, 2) If a node with an
R|M  task  fails  before  its  output  is  consumed, the whole task must be
re-evaluated,  since  data  is  not written to HDFS. This re-evaluation can
potentially  cascade back to a Map task from the first step but it does not
necessarily require that the whole job be re-evaluated.

To  compare  the  two cases analytically, assume for the sake of simplicity
that  the  size  of the input data is equal to the size of intermediate and
output  data. Let a “pass” over the data denote reading or writing the data
to  disk  or transferring the data over the network. Then with N Map/Reduce
jobs, we have:
    (N  jobs* 2 tasks/job * 3 passes/job) = 6*N passes           // default
behavior
verses:
    3               +               4*(N-1)               +               3
passes                                                          //     with
composition

Comparing  the  two  cases,  we  have 6 + 6*(N-1) passes verses 6 + 4*(N-1)
passes, which is a savings of 33% with composition.

With  our  prototype  running on a 10-node cluster, we achieved a 15 to 20%
speedup  on a workload with 3- to 5-way joins using Map/Reduce composition.
Note  that  we  used  a  replication  factor of 1 when writing HDFS in both
cases.  While  we  have  not yet explored scheduling and other optimization
issues,  knowing  that  multiple  jobs  are  related  may  lead  to further
improvements in performance.

Our prototype did add complexity to the JobTracker, since now a sequence of
Map/Reduce  jobs  must  be  configured and tracked. This makes job.xml more
complicated.  Also,  as  noted earlier, an intermediate R|M step writes its
output  to the local file system instead of HDFS. So if its node fails, the
output  on that node is lost. But if the output of R|M steps are written to
HDFS  instead,  then  we  can  still  avoid  2  passes over the data, while
benefiting  from  the fault tolerance that HDFS provides to avoid cascading
re-evaulation. We did not implement this, however.

To summarize, Map/Reduce composition can improve the performance of a
sequence of Map/Reduce jobs by eliminating disk I/O for intermediate
results. However, it comes with increased system complexity and possibly
more re-evaluations in the face of node failures. We would be interested in
getting more feedback on whether people think some of the ideas regarding
Map/Reduce composition are worth considering in Hadoop.

Thanks!

Vuk



                                                                           
             Milind Bhandarkar                                             
             <milindb@yahoo-in                                             
             c.com>                                                     To 
                                       <ha...@lucene.apache.org>     
             08/29/2007 11:10                                           cc 
             PM                                                            
                                                                   Subject 
                                       Re: Poly-reduce?                    
             Please respond to                                             
             hadoop-user@lucen                                             
               e.apache.org                                                
                                                                           
                                                                           
                                                                           




I agree with Owen and Doug. As long as the intermediate outputs (i.e. Data
in between phases) are stored on tasktrackers' local disks, prone to
failure, having more than two phases will be counterproductive. If
intermediate data storage were on a fault-tolerant DFS, one would see more
benefits of chaining arbitrary sequence of phases. (But then the reasoning
in the original email for having multiple-phases, i.e not having to upload
data to DFS, would no longer be valid.)

- milind


On 8/24/07 9:53 AM, "Doug Cutting" <cu...@apache.org> wrote:

> Ted Dunning wrote:
>> It isn't hard to implement these programs as multiple fully fledged
>> map-reduces, but it appears to me that many of them would be better
>> expressed as something more like a map-reduce-reduce program.
>>
>> [ ... ]
>>
>> Expressed conventionally, this would have write all of the user sessions
to
>> HDFS and a second map phase would generate the pairs for counting.  The
>> opportunity for efficiency would come from the ability to avoid writing
>> intermediate results to the distributed data store.
>>
>> Has anybody looked at whether this would help and whether it would be
hard
>> to do?
>
> It would job tracker more complicated, and might not help job execution
> time that much.
>
> Consider implementing this as multiple map reduce steps, but using a
> replication level of one for intermediate data.  That would mostly have
> the performance characteristics you want.  But if a node died, things
> could not intelligently automatically re-create just the missing data.
> Instead the application would have to re-run the entire job, or subsets
> of it, in order to re-create the un-replicated data.
>
> Under poly-reduce, if a node failed, all tasks that were incomplete on
> that node would need to be restarted.  But first, their input data would
> need to be located.  If you saved all intermediate data in the course of
> a job (which would be expensive) then the inputs that need re-creation
> would mostly just be those that were created on the failed node.  But
> this failure would generally cascade all the way back to the initial map
> stage.  So a single machine failure in the last phase could double the
> run time of the job, with most of the cluster idle.
>
> If, instead, you used normal mapreduce, with intermediate data
> replicated in the filesystem, a single machine failure in the last phase
> would only require re-running tasks from the last job.
>
> Perhaps, when chaining mapreduces, one should use a lower replication
> level for intermediate data, like two.  Additionally, one might wish to
> relax the one-replica-off-rack criterion for such files, so that
> replication is faster, and since whole-rack failures are rare.  This
> might give good chained performance, but keep machine failures from
> knocking tasks back to the start of the chain.  Currently its not
> possible to disable the one-replica-off-rack preference, but that might
> be a reasonable feature request.
>
> Doug
>

--
Milind Bhandarkar
408-349-2136
(milindb@yahoo-inc.com)


Re: Poly-reduce?

Posted by Milind Bhandarkar <mi...@yahoo-inc.com>.
I agree with Owen and Doug. As long as the intermediate outputs (i.e. Data
in between phases) are stored on tasktrackers' local disks, prone to
failure, having more than two phases will be counterproductive. If
intermediate data storage were on a fault-tolerant DFS, one would see more
benefits of chaining arbitrary sequence of phases. (But then the reasoning
in the original email for having multiple-phases, i.e not having to upload
data to DFS, would no longer be valid.)

- milind


On 8/24/07 9:53 AM, "Doug Cutting" <cu...@apache.org> wrote:

> Ted Dunning wrote:
>> It isn't hard to implement these programs as multiple fully fledged
>> map-reduces, but it appears to me that many of them would be better
>> expressed as something more like a map-reduce-reduce program.
>> 
>> [ ... ]
>> 
>> Expressed conventionally, this would have write all of the user sessions to
>> HDFS and a second map phase would generate the pairs for counting.  The
>> opportunity for efficiency would come from the ability to avoid writing
>> intermediate results to the distributed data store.
>>     
>> Has anybody looked at whether this would help and whether it would be hard
>> to do?
> 
> It would job tracker more complicated, and might not help job execution
> time that much.
> 
> Consider implementing this as multiple map reduce steps, but using a
> replication level of one for intermediate data.  That would mostly have
> the performance characteristics you want.  But if a node died, things
> could not intelligently automatically re-create just the missing data.
> Instead the application would have to re-run the entire job, or subsets
> of it, in order to re-create the un-replicated data.
> 
> Under poly-reduce, if a node failed, all tasks that were incomplete on
> that node would need to be restarted.  But first, their input data would
> need to be located.  If you saved all intermediate data in the course of
> a job (which would be expensive) then the inputs that need re-creation
> would mostly just be those that were created on the failed node.  But
> this failure would generally cascade all the way back to the initial map
> stage.  So a single machine failure in the last phase could double the
> run time of the job, with most of the cluster idle.
> 
> If, instead, you used normal mapreduce, with intermediate data
> replicated in the filesystem, a single machine failure in the last phase
> would only require re-running tasks from the last job.
> 
> Perhaps, when chaining mapreduces, one should use a lower replication
> level for intermediate data, like two.  Additionally, one might wish to
> relax the one-replica-off-rack criterion for such files, so that
> replication is faster, and since whole-rack failures are rare.  This
> might give good chained performance, but keep machine failures from
> knocking tasks back to the start of the chain.  Currently its not
> possible to disable the one-replica-off-rack preference, but that might
> be a reasonable feature request.
> 
> Doug
> 

--
Milind Bhandarkar
408-349-2136
(milindb@yahoo-inc.com)


Re: Poly-reduce?

Posted by Doug Cutting <cu...@apache.org>.
Ted Dunning wrote:
> It isn't hard to implement these programs as multiple fully fledged
> map-reduces, but it appears to me that many of them would be better
> expressed as something more like a map-reduce-reduce program.
> 
> [ ... ]
> 
> Expressed conventionally, this would have write all of the user sessions to
> HDFS and a second map phase would generate the pairs for counting.  The
> opportunity for efficiency would come from the ability to avoid writing
> intermediate results to the distributed data store.
>     
> Has anybody looked at whether this would help and whether it would be hard
> to do?

It would job tracker more complicated, and might not help job execution 
time that much.

Consider implementing this as multiple map reduce steps, but using a 
replication level of one for intermediate data.  That would mostly have 
the performance characteristics you want.  But if a node died, things 
could not intelligently automatically re-create just the missing data. 
Instead the application would have to re-run the entire job, or subsets 
of it, in order to re-create the un-replicated data.

Under poly-reduce, if a node failed, all tasks that were incomplete on 
that node would need to be restarted.  But first, their input data would 
need to be located.  If you saved all intermediate data in the course of 
a job (which would be expensive) then the inputs that need re-creation 
would mostly just be those that were created on the failed node.  But 
this failure would generally cascade all the way back to the initial map 
stage.  So a single machine failure in the last phase could double the 
run time of the job, with most of the cluster idle.

If, instead, you used normal mapreduce, with intermediate data 
replicated in the filesystem, a single machine failure in the last phase 
would only require re-running tasks from the last job.

Perhaps, when chaining mapreduces, one should use a lower replication 
level for intermediate data, like two.  Additionally, one might wish to 
relax the one-replica-off-rack criterion for such files, so that 
replication is faster, and since whole-rack failures are rare.  This 
might give good chained performance, but keep machine failures from 
knocking tasks back to the start of the chain.  Currently its not 
possible to disable the one-replica-off-rack preference, but that might 
be a reasonable feature request.

Doug


Re: Poly-reduce?

Posted by "Peter W." <pe...@marketingbrokers.com>.
Hello,

Great observation, here's a hack that may be helpful
until such HDFS functionality is included.

You can put static Java collections inside of your class which
implements Reducer but outside of your reduce method to fix.

TreeMaps are good for this or (if(!HashMap.containsKey())).

Iterate thru your intermediate values in the while loop
placing items in these inner dictionary structures. After
the while loop (still inside reduce) get collections data,
and send to OutputCollector with session id as
WritableComparable key and Text value.

Regards,

Peter W.

On Aug 22, 2007, at 10:55 AM, Ted Dunning wrote:

>
> I am finding that it is a common pattern that multi-phase map-reduce
> programs I need to write very often have nearly degenerate map  
> functions in
> second and later map-reduce phases.  The only need for these  
> function is to
> select the next reduce key and very often, a local combiner can be  
> used to
> greatly decrease the number of records passed to the second reduce.
>
> It isn't hard to implement these programs as multiple fully fledged
> map-reduces, but it appears to me that many of them would be better
> expressed as something more like a map-reduce-reduce program.
>
> For example, take the problem of coocurrence counting in log  
> records.  The
> first map would extract a user id and an object id and group on  
> user id.
> The second reduce would take entire sessions for a single user and  
> generate
> co-occurrence pairs as keys for the second reduce, each with a count
> determined by the frequency of the objects in the user history.   
> The second
> reduce (and local combiner) would aggregate these counts and  
> discard items
> with small counts.
>
> Expressed conventionally, this would have write all of the user  
> sessions to
> HDFS and a second map phase would generate the pairs for counting.   
> The
> opportunity for efficiency would come from the ability to avoid  
> writing
> intermediate results to the distributed data store.
>
> Has anybody looked at whether this would help and whether it would  
> be hard
> to do?
>
>


Re: Poly-reduce?

Posted by Michele Catasta <mi...@deri.org>.
Hi all,
I'm just a month into using hadoop too, and it sounds like we are all
wishing for this kind of feature.

> 2. Map tasks of the next step are streamed data directly from preceding
> reduce tasks. This is more along the lines Ted is suggesting - make
> iterative map-reduce a primitive natively supported in Hadoop. This is
> probably a better solution - but more work?

I would like basically to do the same, with a mandatory condition:
without spilling data into temporary files. Keeping in RAM all the
files that the reduce outputs would be great in my context.

Maybe the solution could be an instance of InMemoryFileSystem? Just
passing the reference from the Reduce to the next Map (using an
external daemon... that it sounds to me like the only viable pattern
to do MapReduce chaining, correct me if I'm wrong)?

Would the inramfs distributed on all the nodes?

All the working solutions will be greatly appreciated :) I was just
supposing, the truth is that I still don't have a clue about it.

Regards,
-Michele Catasta

RE: Poly-reduce?

Posted by Joydeep Sen Sarma <js...@facebook.com>.
Completely agree. We are seeing the same pattern - need a series of
map-reduce jobs for most stuff. There are a few different alternatives
that may help:

1. The output of the intermediate reduce phases can be written to files
that are not replicated. Not sure whether we can do this through
map-reduce - but hdfs seems to be able to set replication level per
file.

2. Map tasks of the next step are streamed data directly from preceding
reduce tasks. This is more along the lines Ted is suggesting - make
iterative map-reduce a primitive natively supported in Hadoop. This is
probably a better solution - but more work? 

I am sure this has been encountered in other scenarios (heck - I am just
a month into using hadoop) - so would be interested to know what other
people are thinking and whether there are any upcoming features to
support this programming paradigm ..

Joydeep



-----Original Message-----
From: Ted Dunning [mailto:tdunning@veoh.com] 
Sent: Wednesday, August 22, 2007 10:56 AM
To: hadoop-user@lucene.apache.org
Subject: Poly-reduce?


I am finding that it is a common pattern that multi-phase map-reduce
programs I need to write very often have nearly degenerate map functions
in
second and later map-reduce phases.  The only need for these function is
to
select the next reduce key and very often, a local combiner can be used
to
greatly decrease the number of records passed to the second reduce.

It isn't hard to implement these programs as multiple fully fledged
map-reduces, but it appears to me that many of them would be better
expressed as something more like a map-reduce-reduce program.

For example, take the problem of coocurrence counting in log records.
The
first map would extract a user id and an object id and group on user id.
The second reduce would take entire sessions for a single user and
generate
co-occurrence pairs as keys for the second reduce, each with a count
determined by the frequency of the objects in the user history.  The
second
reduce (and local combiner) would aggregate these counts and discard
items
with small counts. 

Expressed conventionally, this would have write all of the user sessions
to
HDFS and a second map phase would generate the pairs for counting.  The
opportunity for efficiency would come from the ability to avoid writing
intermediate results to the distributed data store.
    
Has anybody looked at whether this would help and whether it would be
hard
to do?