You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Ricky Ho <rh...@adobe.com> on 2008/11/06 18:29:03 UTC

Hadoop Design Question

Hi,

While exploring how Hadoop fits in our usage scenarios, there are 4 recurring issues keep popping up.  I don't know if they are real issues or just our misunderstanding of Hadoop.  Can any expert shed some light here ?

Disk I/O overhead
==================
- The output of a Map task is written to a local disk and then later on upload to the Reduce task.  While this enable a simple recovery strategy when the map task failed, it incur additional disk I/O overhead.

- For example, in our popular Hadoop example of calculating the approximation of "Pi", there isn't any input data.  The map tasks in this example, should just directly feed its output to the reduce task.  So I am wondering if there is an option to bypassing the step of writing the map result to the local disk.


Pipelining between Map & Reduce phases is not possible
=======================================================
- In the current setting, it sounds like no reduce task will be started before all map tasks have completed.  In case if there are a few slow running map tasks, the whole job will be delayed.

- The overall job execution can be shortened if the reduce tasks can starts its processing as soon as some map results are available rather than waiting for all the map tasks to complete.


Pipelining between jobs
========================
- In many cases, we've found the parallel computation doesn't involve just one single map/reduce job, but multiple inter-dependent map/reduce jobs then work together in some coordinating fashion.

- Again, I haven't seen any mechanism available for 2 MapReduce jobs to directly interact with each other.  Job1 must write its output to HDFS for Job2 to pickup. On the other hand, once the "map" phase of a Job2 has started, all its input HDFS files has to be freezed (in other words, Job1 cannot append more records into the HDFS files)

- Therefore it is impossible for the reduce phase of Job1 to stream its output data to a file while the map phase of Job2 start reading the same file.  Job2 can only start after ALL REDUCE TASKS of Job1 is completed, which makes pipelining between jobs impossible.


No parallelism of reduce task with one key
===========================================
- Parallelism only happens in the map phase, as well as reduce phase (on different keys).  But there is no parallelism within a reduce process of a particular key

- This means the partitioning function has to be chosen carefully to make sure the workload of the reduce processes is balanced.  (maybe not a big deal)

- Is there any thoughts of running a pool of reduce tasks on the same key and have they combine their results later ?


Rgds, Ricky

Re: More Hadoop Design Question

Posted by Owen O'Malley <om...@apache.org>.
On Nov 6, 2008, at 2:30 PM, Ricky Ho wrote:

> Hmmm, sounds like the combiner is invoked after the map() process  
> completed for the file split.

No. The data path is complex, but the combiner is called when the map  
outputs are being spilled to disk. So roughly, the map will output  
key, value pairs until the io.sort.mb buffer is full, the contents are  
sorted and fed to the combiner. The output of the combiner is written  
to disk. When there are enough spills on disk, it will merge them  
together, call the combiner, and write to disk. When the map finishes,  
the final multi-level merge is done.

Since the reduce is also doing multi-level sort, it will also call the  
combiner when a merge is done (other than the final merge, which is  
fed into the reduce).

> That means, before the combiner function starts, all the  
> intermediate map() output result will be kept in memory ?  Any  
> comment on the memory footprint consumption ?

The memory is bound by io.sort.mb.

>  I think a sufficient condition is just to make sure the reduce task  
> will not COMPLETE before all the map tasks has completed.  We don't  
> need to make sure the reduce task will not START before all maps  
> tasks has completed.  This can be achieved easily by letting the  
> iterator.next() call within the reduce() method blocked.

*Sigh* no. The reduce function is invoked once per a unique key. The  
reduce function is called in ascending order of keys. Since the final  
map may return a's when previously you've only seen b's and c's. You  
can't call the reduce with the b, you can't later call it with the a.

> There is another potential issue in the reduce() API, can you  
> explain why do we need to expose the OutputCollector to the reduce()  
> method ?  For example, is it possible that the "key" in the  
> output.collect() be a different key from the reduce method  
> parameter ?  What happen if two reduce method (start with different  
> keys) writing their output on the same key ?

The reduce is allowed to have different input and output types. There  
are *four* type parameters.

Reducer<KeyIn, ValueIn, KeyOut, ValueOut>

The output of the reduce is not resorted. If the reduce doesn't use  
the same key as the input, the output of the reduce won't be sorted.  
Duplicate keys on reduce output (either within the same reduce or  
different ones, is not a problem for the framework.)

> However, this requires some change of the current Reducer  
> interface.  Currently the reduce() method is called once per key.   
> We want that to be called once per map result (within the same  
> key).  What I mean is the following interface ...

There is a library that lets you run a chain of maps, if that is the  
semantics you are looking for. For map/reduce, the sort is a very  
fundamental piece. If you don't need sort between map and reduce, you  
can set reduces = 0 and run much faster.

> Does it make sense ?

Not really. Most map/reduce applications need the other semantics.

-- Owen

More Hadoop Design Question

Posted by Ricky Ho <rh...@adobe.com>.
Bryan and Owen, thanks for your prompted response and comment.

Hmmm, sounds like the combiner is invoked after the map() process completed for the file split.  In other words, the combiner will not started before all the map() within that file split is completed (for the same reason that the combiner result won't be accurate otherwise).  Is this correct ?

That means, before the combiner function starts, all the intermediate map() output result will be kept in memory ?  Any comment on the memory footprint consumption ?

Regarding your comment on the final "reduce" result will be incorrect if we let the map tasks and reduce tasks running concurrently.  I think a sufficient condition is just to make sure the reduce task will not COMPLETE before all the map tasks has completed.  We don't need to make sure the reduce task will not START before all maps tasks has completed.  This can be achieved easily by letting the iterator.next() call within the reduce() method blocked.

Look at a typical reduce method ...

class MyReducer implements Reducer {
        void reduce(Text key, Iterator values, OutputCollector output, ...) {
                Object accum = new Result();
                while (values.hasNext()) {
                        Object value = values.next().get();
                        accum = accumulate(accum, value);
                }
                output.collect(key, accum);
        }
}

Think about the reduce() method is called much earlier, while the map tasks still working.  When reaching the line "values.next()", the reduce task will be blocked if there is no map result available (ie: when the map tasks doesn't feed him fast enough).  The final result is still accurate because the reduce() method will not return before all the map tasks has completed.  This way we can have the reduce() task overlapping with map() task to shorten the overall job duration.  Right ?


There is another potential issue in the reduce() API, can you explain why do we need to expose the OutputCollector to the reduce() method ?  For example, is it possible that the "key" in the output.collect() be a different key from the reduce method parameter ?  What happen if two reduce method (start with different keys) writing their output on the same key ?


You may notice that in my earlier proposed blocked call in values.next(), having a lot of blocking threads (one thread per key) is not a good idea.  Instead of blocking within the reduce() method, it is much better to have Hadoop framework to invoke the reduce method after some (but not all) map results are available.

However, this requires some change of the current Reducer interface.  Currently the reduce() method is called once per key.  We want that to be called once per map result (within the same key).  What I mean is the following interface ...

public interface Reducer {
        Writable init();
        Writable reduce(Object key, Writable value, Writable accum);
}

Now implement a WordCountReducer will look like the following ...

public class WordCountReducer implements Reducer {
        public Writable init() {
                return new IntWritable(0);
        }

        public Writable reduce(Object key, IntWritable value, IntWritable accum) {
                return new IntWritable(accum.get() + value.get());
        }
}


Summarizing the difference of this interface (from the original one)

1) The reduce() method called per map output, rather than called per key.  Therefore, the Hadoop framework doesn't need to wait for the whole map process to complete before starting the reduce, performance will be improved by overlapping the map and reduce execution.

2) OutputCollector is not exposed to the reduce() method.  Hadoop framework will write out the final reduced result using the same key.

Does it make sense ?

Rgds,
Ricky

-----Original Message-----
From: Bryan Duxbury [mailto:bryan@rapleaf.com]
Sent: Thursday, November 06, 2008 10:36 AM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop Design Question

Comments inline.

On Nov 6, 2008, at 9:29 AM, Ricky Ho wrote:

> Hi,
>
> While exploring how Hadoop fits in our usage scenarios, there are 4
> recurring issues keep popping up.  I don't know if they are real
> issues or just our misunderstanding of Hadoop.  Can any expert shed
> some light here ?
>
> Disk I/O overhead
> ==================
> - The output of a Map task is written to a local disk and then
> later on upload to the Reduce task.  While this enable a simple
> recovery strategy when the map task failed, it incur additional
> disk I/O overhead.
>
> - For example, in our popular Hadoop example of calculating the
> approximation of "Pi", there isn't any input data.  The map tasks
> in this example, should just directly feed its output to the reduce
> task.  So I am wondering if there is an option to bypassing the
> step of writing the map result to the local disk.

In most data-intensive map/reduce jobs, you have to spill your map
output to disk at some point because you will run out of memory
otherwise. Additionally, Pi calculation is a really bad example,
because you could always start "reducing" any pairs together
arbitrarily. This is because pi calculation is commutative and
associative. We have a special construct for situations like that
called a "combiner", which is basically a map-side reducer.

>
>
> Pipelining between Map & Reduce phases is not possible
> =======================================================
> - In the current setting, it sounds like no reduce task will be
> started before all map tasks have completed.  In case if there are
> a few slow running map tasks, the whole job will be delayed.
>
> - The overall job execution can be shortened if the reduce tasks
> can starts its processing as soon as some map results are available
> rather than waiting for all the map tasks to complete.

You can't start reducing until all map tasks are complete because
until all map tasks complete, you can't do an accurate sort of all
intermediate key/value pairs. That is, if you just started reducing
the results of a single map task immediately, you might have other
values for some keys that come from different map tasks, and your
reduce would be inaccurate. In theory if you know that each map task
produces keys only in a certain range, you could start reducing
immediately after the map task finishes, but that seems like an
unlikely case.

>
>
> Pipelining between jobs
> ========================
> - In many cases, we've found the parallel computation doesn't
> involve just one single map/reduce job, but multiple inter-
> dependent map/reduce jobs then work together in some coordinating
> fashion.
>
> - Again, I haven't seen any mechanism available for 2 MapReduce
> jobs to directly interact with each other.  Job1 must write its
> output to HDFS for Job2 to pickup. On the other hand, once the
> "map" phase of a Job2 has started, all its input HDFS files has to
> be freezed (in other words, Job1 cannot append more records into
> the HDFS files)
>
> - Therefore it is impossible for the reduce phase of Job1 to stream
> its output data to a file while the map phase of Job2 start reading
> the same file.  Job2 can only start after ALL REDUCE TASKS of Job1
> is completed, which makes pipelining between jobs impossible.

Certainly, many transformations take more than one map/reduce job.
However, very few could actually be pipelined such that the output of
one fed directly into another without an intermediate stop in a file.
If the first job does any grouping or sorting, then the reduce is
necessary and it will have to write out to a file before anything
else can go on. If the second job also does grouping or sorting, then
you definitely need two jobs. If the second job doesn't do grouping
or sorting, then it can probably be collapsed into either the map or
reduce of the first job.

>
>
> No parallelism of reduce task with one key
> ===========================================
> - Parallelism only happens in the map phase, as well as reduce
> phase (on different keys).  But there is no parallelism within a
> reduce process of a particular key
>
> - This means the partitioning function has to be chosen carefully
> to make sure the workload of the reduce processes is balanced.
> (maybe not a big deal)
>
> - Is there any thoughts of running a pool of reduce tasks on the
> same key and have they combine their results later ?

I think you will find very few situations where you have only one key
on reduce. If you do, it's probably a scenario where you can use a
combiner and eliminate the problem. Basically all map/reduce jobs
I've worked on have a large number of keys going into the reduce phase.


>
> Rgds, Ricky


Re: Hadoop Design Question

Posted by Bryan Duxbury <br...@rapleaf.com>.
Comments inline.

On Nov 6, 2008, at 9:29 AM, Ricky Ho wrote:

> Hi,
>
> While exploring how Hadoop fits in our usage scenarios, there are 4  
> recurring issues keep popping up.  I don't know if they are real  
> issues or just our misunderstanding of Hadoop.  Can any expert shed  
> some light here ?
>
> Disk I/O overhead
> ==================
> - The output of a Map task is written to a local disk and then  
> later on upload to the Reduce task.  While this enable a simple  
> recovery strategy when the map task failed, it incur additional  
> disk I/O overhead.
>
> - For example, in our popular Hadoop example of calculating the  
> approximation of "Pi", there isn't any input data.  The map tasks  
> in this example, should just directly feed its output to the reduce  
> task.  So I am wondering if there is an option to bypassing the  
> step of writing the map result to the local disk.

In most data-intensive map/reduce jobs, you have to spill your map  
output to disk at some point because you will run out of memory  
otherwise. Additionally, Pi calculation is a really bad example,  
because you could always start "reducing" any pairs together  
arbitrarily. This is because pi calculation is commutative and  
associative. We have a special construct for situations like that  
called a "combiner", which is basically a map-side reducer.

>
>
> Pipelining between Map & Reduce phases is not possible
> =======================================================
> - In the current setting, it sounds like no reduce task will be  
> started before all map tasks have completed.  In case if there are  
> a few slow running map tasks, the whole job will be delayed.
>
> - The overall job execution can be shortened if the reduce tasks  
> can starts its processing as soon as some map results are available  
> rather than waiting for all the map tasks to complete.

You can't start reducing until all map tasks are complete because  
until all map tasks complete, you can't do an accurate sort of all  
intermediate key/value pairs. That is, if you just started reducing  
the results of a single map task immediately, you might have other  
values for some keys that come from different map tasks, and your  
reduce would be inaccurate. In theory if you know that each map task  
produces keys only in a certain range, you could start reducing  
immediately after the map task finishes, but that seems like an  
unlikely case.

>
>
> Pipelining between jobs
> ========================
> - In many cases, we've found the parallel computation doesn't  
> involve just one single map/reduce job, but multiple inter- 
> dependent map/reduce jobs then work together in some coordinating  
> fashion.
>
> - Again, I haven't seen any mechanism available for 2 MapReduce  
> jobs to directly interact with each other.  Job1 must write its  
> output to HDFS for Job2 to pickup. On the other hand, once the  
> "map" phase of a Job2 has started, all its input HDFS files has to  
> be freezed (in other words, Job1 cannot append more records into  
> the HDFS files)
>
> - Therefore it is impossible for the reduce phase of Job1 to stream  
> its output data to a file while the map phase of Job2 start reading  
> the same file.  Job2 can only start after ALL REDUCE TASKS of Job1  
> is completed, which makes pipelining between jobs impossible.

Certainly, many transformations take more than one map/reduce job.  
However, very few could actually be pipelined such that the output of  
one fed directly into another without an intermediate stop in a file.  
If the first job does any grouping or sorting, then the reduce is  
necessary and it will have to write out to a file before anything  
else can go on. If the second job also does grouping or sorting, then  
you definitely need two jobs. If the second job doesn't do grouping  
or sorting, then it can probably be collapsed into either the map or  
reduce of the first job.

>
>
> No parallelism of reduce task with one key
> ===========================================
> - Parallelism only happens in the map phase, as well as reduce  
> phase (on different keys).  But there is no parallelism within a  
> reduce process of a particular key
>
> - This means the partitioning function has to be chosen carefully  
> to make sure the workload of the reduce processes is balanced.   
> (maybe not a big deal)
>
> - Is there any thoughts of running a pool of reduce tasks on the  
> same key and have they combine their results later ?

I think you will find very few situations where you have only one key  
on reduce. If you do, it's probably a scenario where you can use a  
combiner and eliminate the problem. Basically all map/reduce jobs  
I've worked on have a large number of keys going into the reduce phase.


>
> Rgds, Ricky


Re: Hadoop Design Question

Posted by Owen O'Malley <om...@apache.org>.
On Nov 6, 2008, at 11:29 AM, Ricky Ho wrote:

> Disk I/O overhead
> ==================
> - The output of a Map task is written to a local disk and then later  
> on upload to the Reduce task.  While this enable a simple recovery  
> strategy when the map task failed, it incur additional disk I/O  
> overhead.

That is correct. However, Linux does very well at using extra ram for  
buffer caches, so as long as you enable write behind it won't be a  
performance problem. You are right that the primary motivation is both  
recoverability and not needing the reduces running until after maps  
finish.

>   So I am wondering if there is an option to bypassing the step of  
> writing the map result to the local disk.

Currently no.

> - In the current setting, it sounds like no reduce task will be  
> started before all map tasks have completed.  In case if there are a  
> few slow running map tasks, the whole job will be delayed.

The application's reduce function can't start until the last map  
finishes because the input to the reduce is sorted. Since the last map  
may generate the first keys that must be given to the reduce, the  
reduce must wait.

> - The overall job execution can be shortened if the reduce tasks can  
> starts its processing as soon as some map results are available  
> rather than waiting for all the map tasks to complete.

But it would violate the specification of the framework that the input  
to reduce is completely sorted.

> - Therefore it is impossible for the reduce phase of Job1 to stream  
> its output data to a file while the map phase of Job2 start reading  
> the same file.  Job2 can only start after ALL REDUCE TASKS of Job1  
> is completed, which makes pipelining between jobs impossible.

It is currently not supported, but the framework could be extended to  
let the client add input splits after the job has started. That would  
remove the hard synchronization between jobs.

> - This means the partitioning function has to be chosen carefully to  
> make sure the workload of the reduce processes is balanced.  (maybe  
> not a big deal)

Yes, the partitioner must balance the workload between the reduces.

> - Is there any thoughts of running a pool of reduce tasks on the  
> same key and have they combine their results later ?

That is called the combiner. It is called multiple times as the data  
is merged together. See the word count example. If the reducer does  
data reduction, using combiners is very important for performance.

-- Owen