You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Jacques <wh...@gmail.com> on 2011/02/13 21:18:00 UTC

Best approach for accessing secondary map task outputs from reduce tasks?

I'm outputting a small amount of secondary summary information from a map
task that I want to use in the reduce phase of the job.  This information is
keyed on a custom input split index.

Each map task outputs this summary information (less than hundred bytes per
input task).  Note that the summary information isn't ready until the
completion of the map task.

Each reduce task needs to read this information (for all input splits) to
complete its task.

What is the best way to pass this information to the Reduce stage?  I'm
working on java using cdhb2.   Ideas I had include:

1. Output this data to MapContext.getWorkOutputPath().  However, that data
is not available anywhere in the reduce stage.
2. Output this data to "mapred.output.dir".  The problem here is that the
map task writes immediately to this so failed jobs and speculative execution
could cause collision issues.
3. Output this data as in (1) and then use Mapper.cleanup() to copy these
files to "mapred.output.dir".  Could work but I'm still a little concerned
about collision/race issues as I'm not clear about when a Map task becomes
"the" committed map task for that split.
4. Use an external system to hold this information and then just call that
system from both phases.  This is basically an alternative of #3 and has the
same issues.

Are there suggested approaches of how to do this?

It seems like (1) might make the most sense if there is a defined way to
stream secondary outputs from all the mappers within the Reduce.setup()
method.

Thanks for any ideas.

Jacques

Re: Best approach for accessing secondary map task outputs from reduce tasks?

Posted by Chris Douglas <cd...@apache.org>.
On Sun, Feb 13, 2011 at 8:22 PM, Jason <ur...@gmail.com> wrote:
> I think this kind of partitioner is a little hackish. More straight forward approach is to emit the extra data N times under special keys and write a partitioner that would recognize these keys and dispatch them accordingly between partitions 0..N-1
> Also if this data needs to be shipped to reducers upfront, it could be easily done using custom sort comparator

As listed in the assumptions, I thought each map emits only one datum
that must be read by every reduce. Not one special datum among the
normal output. Changing the output record type to add the partition
struck me as overly formal, so the hackish solution seemed
appropriate. If the summary data complement record data emitted from
the map, then composing the job as you describe is standard.

However, if the map is non-deterministic, then- again- all of the
output (not just the summary data) from the first stage must go to
durable storage (i.e. HDFS), or re-executions will yield inconsistent
results. I haven't set up MO to effect the shuffle in HDFS as Harsh
describes, but it could be made to work. -C

Re: Best approach for accessing secondary map task outputs from reduce tasks?

Posted by Jacques <wh...@gmail.com>.
Everybody, thanks for all the help.

Chris/Jason, while 1) assumption is actually incorrect for my situation.
Nonetheless, I can see how one would basically use a dynamic-typing approach
to sending the additional data as a first keys for each partition.  It seems
less than elegant but doable.

The solution Harsh J provides seems to be more elegant but I need to spend
some more time understanding exactly what the interplay between paths and
comitting a job using MultipleOutputs is.  My first and second passes have
failed but it seems to indeed be committing the output of the map task even
when it is in a map+reduce job.  Like most things, I just need to spend some
more time digging through the code at a more detailed level.  I will come
back if I have more questions.

Per the final assumption that Harsh J mentions, "the framework guarantees
that the Reduce operation never starts until
all Map tasks have finished".  It seems that, while pre-shuffling could be
possible to get ready for running the reduce, the reduce would always have
to wait unless a Partitioner could guarantee that a particular partition was
complete before all maps were completed.  Is there talk of this changing
somehow?

Thanks again,
Jacques


On Sun, Feb 13, 2011 at 8:22 PM, Jason <ur...@gmail.com> wrote:

> I think this kind of partitioner is a little hackish. More straight forward
> approach is to emit the extra data N times under special keys and write a
> partitioner that would recognize these keys and dispatch them accordingly
> between partitions 0..N-1
> Also if this data needs to be shipped to reducers upfront, it could be
> easily done using custom sort comparator
>
> Sent from my iPhone
>
> On Feb 13, 2011, at 8:05 PM, Chris Douglas <cd...@apache.org> wrote:
>
> > If these assumptions are correct:
> >
> > 0) Each map outputs one result, a few hundred bytes
> > 1) The map output is deterministic, given an input split index
> > 2) Every reducer must see the result from every map
> >
> > Then just output the result N times, where N is the number of
> > reducers, using a custom Partitioner that assigns the result to
> > (records_seen++ % N), where records_seen is an int field on the
> > partitioner.
> >
> > If (1) does not hold, then write the first stage as job with a single
> > (optional) reduce, and the second stage as a map-only job processing
> > the result. -C
> >
> > On Sun, Feb 13, 2011 at 12:18 PM, Jacques <wh...@gmail.com> wrote:
> >> I'm outputting a small amount of secondary summary information from a
> map
> >> task that I want to use in the reduce phase of the job.  This
> information is
> >> keyed on a custom input split index.
> >>
> >> Each map task outputs this summary information (less than hundred bytes
> per
> >> input task).  Note that the summary information isn't ready until the
> >> completion of the map task.
> >>
> >> Each reduce task needs to read this information (for all input splits)
> to
> >> complete its task.
> >>
> >> What is the best way to pass this information to the Reduce stage?  I'm
> >> working on java using cdhb2.   Ideas I had include:
> >>
> >> 1. Output this data to MapContext.getWorkOutputPath().  However, that
> data
> >> is not available anywhere in the reduce stage.
> >> 2. Output this data to "mapred.output.dir".  The problem here is that
> the
> >> map task writes immediately to this so failed jobs and speculative
> execution
> >> could cause collision issues.
> >> 3. Output this data as in (1) and then use Mapper.cleanup() to copy
> these
> >> files to "mapred.output.dir".  Could work but I'm still a little
> concerned
> >> about collision/race issues as I'm not clear about when a Map task
> becomes
> >> "the" committed map task for that split.
> >> 4. Use an external system to hold this information and then just call
> that
> >> system from both phases.  This is basically an alternative of #3 and has
> the
> >> same issues.
> >>
> >> Are there suggested approaches of how to do this?
> >>
> >> It seems like (1) might make the most sense if there is a defined way to
> >> stream secondary outputs from all the mappers within the Reduce.setup()
> >> method.
> >>
> >> Thanks for any ideas.
> >>
> >> Jacques
> >>
>

Re: Best approach for accessing secondary map task outputs from reduce tasks?

Posted by Jason <ur...@gmail.com>.
I think this kind of partitioner is a little hackish. More straight forward approach is to emit the extra data N times under special keys and write a partitioner that would recognize these keys and dispatch them accordingly between partitions 0..N-1
Also if this data needs to be shipped to reducers upfront, it could be easily done using custom sort comparator

Sent from my iPhone

On Feb 13, 2011, at 8:05 PM, Chris Douglas <cd...@apache.org> wrote:

> If these assumptions are correct:
> 
> 0) Each map outputs one result, a few hundred bytes
> 1) The map output is deterministic, given an input split index
> 2) Every reducer must see the result from every map
> 
> Then just output the result N times, where N is the number of
> reducers, using a custom Partitioner that assigns the result to
> (records_seen++ % N), where records_seen is an int field on the
> partitioner.
> 
> If (1) does not hold, then write the first stage as job with a single
> (optional) reduce, and the second stage as a map-only job processing
> the result. -C
> 
> On Sun, Feb 13, 2011 at 12:18 PM, Jacques <wh...@gmail.com> wrote:
>> I'm outputting a small amount of secondary summary information from a map
>> task that I want to use in the reduce phase of the job.  This information is
>> keyed on a custom input split index.
>> 
>> Each map task outputs this summary information (less than hundred bytes per
>> input task).  Note that the summary information isn't ready until the
>> completion of the map task.
>> 
>> Each reduce task needs to read this information (for all input splits) to
>> complete its task.
>> 
>> What is the best way to pass this information to the Reduce stage?  I'm
>> working on java using cdhb2.   Ideas I had include:
>> 
>> 1. Output this data to MapContext.getWorkOutputPath().  However, that data
>> is not available anywhere in the reduce stage.
>> 2. Output this data to "mapred.output.dir".  The problem here is that the
>> map task writes immediately to this so failed jobs and speculative execution
>> could cause collision issues.
>> 3. Output this data as in (1) and then use Mapper.cleanup() to copy these
>> files to "mapred.output.dir".  Could work but I'm still a little concerned
>> about collision/race issues as I'm not clear about when a Map task becomes
>> "the" committed map task for that split.
>> 4. Use an external system to hold this information and then just call that
>> system from both phases.  This is basically an alternative of #3 and has the
>> same issues.
>> 
>> Are there suggested approaches of how to do this?
>> 
>> It seems like (1) might make the most sense if there is a defined way to
>> stream secondary outputs from all the mappers within the Reduce.setup()
>> method.
>> 
>> Thanks for any ideas.
>> 
>> Jacques
>> 

Re: Best approach for accessing secondary map task outputs from reduce tasks?

Posted by Chris Douglas <cd...@apache.org>.
If these assumptions are correct:

0) Each map outputs one result, a few hundred bytes
1) The map output is deterministic, given an input split index
2) Every reducer must see the result from every map

Then just output the result N times, where N is the number of
reducers, using a custom Partitioner that assigns the result to
(records_seen++ % N), where records_seen is an int field on the
partitioner.

If (1) does not hold, then write the first stage as job with a single
(optional) reduce, and the second stage as a map-only job processing
the result. -C

On Sun, Feb 13, 2011 at 12:18 PM, Jacques <wh...@gmail.com> wrote:
> I'm outputting a small amount of secondary summary information from a map
> task that I want to use in the reduce phase of the job.  This information is
> keyed on a custom input split index.
>
> Each map task outputs this summary information (less than hundred bytes per
> input task).  Note that the summary information isn't ready until the
> completion of the map task.
>
> Each reduce task needs to read this information (for all input splits) to
> complete its task.
>
> What is the best way to pass this information to the Reduce stage?  I'm
> working on java using cdhb2.   Ideas I had include:
>
> 1. Output this data to MapContext.getWorkOutputPath().  However, that data
> is not available anywhere in the reduce stage.
> 2. Output this data to "mapred.output.dir".  The problem here is that the
> map task writes immediately to this so failed jobs and speculative execution
> could cause collision issues.
> 3. Output this data as in (1) and then use Mapper.cleanup() to copy these
> files to "mapred.output.dir".  Could work but I'm still a little concerned
> about collision/race issues as I'm not clear about when a Map task becomes
> "the" committed map task for that split.
> 4. Use an external system to hold this information and then just call that
> system from both phases.  This is basically an alternative of #3 and has the
> same issues.
>
> Are there suggested approaches of how to do this?
>
> It seems like (1) might make the most sense if there is a defined way to
> stream secondary outputs from all the mappers within the Reduce.setup()
> method.
>
> Thanks for any ideas.
>
> Jacques
>

Re: Best approach for accessing secondary map task outputs from reduce tasks?

Posted by Harsh J <qw...@gmail.com>.
>From my experience, writing data is possible using MO in both Map and
Reduce sides of a single MR job. All data written to the MO name in
map-side is committed just like it would if the job were a map-only
job (there's no difference, since a map task does not wait for reduce
tasks to begin - it is very independent of what the job plan is). Know
that the MO uses direct record writers instead of the MapOutputBuffer
class that is used in the case of the default collector in a
Map+Reduce job (to write to local filesystem, for ReduceTask to pick
up and use) and thus your data should be available in Reduce side if
the framework guarantees that the Reduce operation never starts until
all Map tasks have finished (which is the case right now).

On Mon, Feb 14, 2011 at 9:05 AM, Jacques <wh...@gmail.com> wrote:
> It was my understanding based on the FAQ and my personal experience, that
> using the MutlipleOutputs class, or just relying on OutputComitter only
> works for the final phase of the job.  (E.g. the reduce phase in a
> map+reduce job and the map phase only in the case of reducer=NONE).  In the
> case I'm talking about, I want the map output to be committed and available
> to the reducers.  If I understand the intricacies of MapReduce, the map
> output of a full map+reduce job is never put onto HDFS but is rather
> streamed directly from the mapper to the requesting reducers.  To use (2)
> effectively, I only want to commit the secondary output to HDFS if the map
> task is completed successfully.
>
> This seems to either require:
> a) Assuming that the first time map.cleanup is called for a particular
> split, that it is the definitive call for that split (and thus commit the
> secondary information at that point)
> b) Or, somehow always commit the map output to directories named for that
> task attempt and then hook a delete of the map task output for those map
> tasks which weren't committed.
>
> Am I missing something and/or over-complicating things?
>
> Thanks for your help
> Jacques
>
> On Sun, Feb 13, 2011 at 6:54 PM, Harsh J <qw...@gmail.com> wrote:
>>
>> With just HDFS, IMO the good approach would be (2). See this FAQ on
>> task-specific HDFS output directories you can use:
>>
>> http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F.
>> It'd also be much easier to use the MultipleOutputs class (or other
>> such utilities) for writing the extra data, as they also prefix -m- or
>> -r- in the filenames, based on the task type.
>>
>> On Mon, Feb 14, 2011 at 1:48 AM, Jacques <wh...@gmail.com> wrote:
>> > I'm outputting a small amount of secondary summary information from a
>> > map
>> > task that I want to use in the reduce phase of the job.  This
>> > information is
>> > keyed on a custom input split index.
>> >
>> > Each map task outputs this summary information (less than hundred bytes
>> > per
>> > input task).  Note that the summary information isn't ready until the
>> > completion of the map task.
>> >
>> > Each reduce task needs to read this information (for all input splits)
>> > to
>> > complete its task.
>> >
>> > What is the best way to pass this information to the Reduce stage?  I'm
>> > working on java using cdhb2.   Ideas I had include:
>> >
>> > 1. Output this data to MapContext.getWorkOutputPath().  However, that
>> > data
>> > is not available anywhere in the reduce stage.
>> > 2. Output this data to "mapred.output.dir".  The problem here is that
>> > the
>> > map task writes immediately to this so failed jobs and speculative
>> > execution
>> > could cause collision issues.
>> > 3. Output this data as in (1) and then use Mapper.cleanup() to copy
>> > these
>> > files to "mapred.output.dir".  Could work but I'm still a little
>> > concerned
>> > about collision/race issues as I'm not clear about when a Map task
>> > becomes
>> > "the" committed map task for that split.
>> > 4. Use an external system to hold this information and then just call
>> > that
>> > system from both phases.  This is basically an alternative of #3 and has
>> > the
>> > same issues.
>> >
>> > Are there suggested approaches of how to do this?
>> >
>> > It seems like (1) might make the most sense if there is a defined way to
>> > stream secondary outputs from all the mappers within the Reduce.setup()
>> > method.
>> >
>> > Thanks for any ideas.
>> >
>> > Jacques
>> >
>>
>>
>>
>> --
>> Harsh J
>> www.harshj.com
>
>



-- 
Harsh J
www.harshj.com

Re: Best approach for accessing secondary map task outputs from reduce tasks?

Posted by Jacques <wh...@gmail.com>.
It was my understanding based on the FAQ and my personal experience, that
using the MutlipleOutputs class, or just relying on OutputComitter only
works for the final phase of the job.  (E.g. the reduce phase in a
map+reduce job and the map phase only in the case of reducer=NONE).  In the
case I'm talking about, I want the map output to be committed and available
to the reducers.  If I understand the intricacies of MapReduce, the map
output of a full map+reduce job is never put onto HDFS but is rather
streamed directly from the mapper to the requesting reducers.  To use (2)
effectively, I only want to commit the secondary output to HDFS if the map
task is completed successfully.

This seems to either require:
a) Assuming that the first time map.cleanup is called for a particular
split, that it is the definitive call for that split (and thus commit the
secondary information at that point)
b) Or, somehow always commit the map output to directories named for that
task attempt and then hook a delete of the map task output for those map
tasks which weren't committed.

Am I missing something and/or over-complicating things?

Thanks for your help
Jacques

On Sun, Feb 13, 2011 at 6:54 PM, Harsh J <qw...@gmail.com> wrote:

> With just HDFS, IMO the good approach would be (2). See this FAQ on
> task-specific HDFS output directories you can use:
>
> http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F
> .
> It'd also be much easier to use the MultipleOutputs class (or other
> such utilities) for writing the extra data, as they also prefix -m- or
> -r- in the filenames, based on the task type.
>
> On Mon, Feb 14, 2011 at 1:48 AM, Jacques <wh...@gmail.com> wrote:
> > I'm outputting a small amount of secondary summary information from a map
> > task that I want to use in the reduce phase of the job.  This information
> is
> > keyed on a custom input split index.
> >
> > Each map task outputs this summary information (less than hundred bytes
> per
> > input task).  Note that the summary information isn't ready until the
> > completion of the map task.
> >
> > Each reduce task needs to read this information (for all input splits) to
> > complete its task.
> >
> > What is the best way to pass this information to the Reduce stage?  I'm
> > working on java using cdhb2.   Ideas I had include:
> >
> > 1. Output this data to MapContext.getWorkOutputPath().  However, that
> data
> > is not available anywhere in the reduce stage.
> > 2. Output this data to "mapred.output.dir".  The problem here is that the
> > map task writes immediately to this so failed jobs and speculative
> execution
> > could cause collision issues.
> > 3. Output this data as in (1) and then use Mapper.cleanup() to copy these
> > files to "mapred.output.dir".  Could work but I'm still a little
> concerned
> > about collision/race issues as I'm not clear about when a Map task
> becomes
> > "the" committed map task for that split.
> > 4. Use an external system to hold this information and then just call
> that
> > system from both phases.  This is basically an alternative of #3 and has
> the
> > same issues.
> >
> > Are there suggested approaches of how to do this?
> >
> > It seems like (1) might make the most sense if there is a defined way to
> > stream secondary outputs from all the mappers within the Reduce.setup()
> > method.
> >
> > Thanks for any ideas.
> >
> > Jacques
> >
>
>
>
> --
> Harsh J
> www.harshj.com
>

Re: Best approach for accessing secondary map task outputs from reduce tasks?

Posted by Harsh J <qw...@gmail.com>.
With just HDFS, IMO the good approach would be (2). See this FAQ on
task-specific HDFS output directories you can use:
http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F.
It'd also be much easier to use the MultipleOutputs class (or other
such utilities) for writing the extra data, as they also prefix -m- or
-r- in the filenames, based on the task type.

On Mon, Feb 14, 2011 at 1:48 AM, Jacques <wh...@gmail.com> wrote:
> I'm outputting a small amount of secondary summary information from a map
> task that I want to use in the reduce phase of the job.  This information is
> keyed on a custom input split index.
>
> Each map task outputs this summary information (less than hundred bytes per
> input task).  Note that the summary information isn't ready until the
> completion of the map task.
>
> Each reduce task needs to read this information (for all input splits) to
> complete its task.
>
> What is the best way to pass this information to the Reduce stage?  I'm
> working on java using cdhb2.   Ideas I had include:
>
> 1. Output this data to MapContext.getWorkOutputPath().  However, that data
> is not available anywhere in the reduce stage.
> 2. Output this data to "mapred.output.dir".  The problem here is that the
> map task writes immediately to this so failed jobs and speculative execution
> could cause collision issues.
> 3. Output this data as in (1) and then use Mapper.cleanup() to copy these
> files to "mapred.output.dir".  Could work but I'm still a little concerned
> about collision/race issues as I'm not clear about when a Map task becomes
> "the" committed map task for that split.
> 4. Use an external system to hold this information and then just call that
> system from both phases.  This is basically an alternative of #3 and has the
> same issues.
>
> Are there suggested approaches of how to do this?
>
> It seems like (1) might make the most sense if there is a defined way to
> stream secondary outputs from all the mappers within the Reduce.setup()
> method.
>
> Thanks for any ideas.
>
> Jacques
>



-- 
Harsh J
www.harshj.com