You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Sagar Naik <sn...@attributor.com> on 2009/04/10 20:12:48 UTC

Multithreaded Reducer

Hi,
I would like to implement a Multi-threaded reducer.
As per my understanding , the system does not have one coz we expect the 
output to be sorted.

However, in my case I dont need the output sorted.

Can u pl point to me any other issues or it would be safe to do so

-Sagar

Re: Multithreaded Reducer

Posted by Owen O'Malley <om...@apache.org>.
On Apr 10, 2009, at 11:12 AM, Sagar Naik wrote:

> Hi,
> I would like to implement a Multi-threaded reducer.
> As per my understanding , the system does not have one coz we expect  
> the output to be sorted.
>
> However, in my case I dont need the output sorted.

You'd probably want to make a blocking concurrent queue of the key  
value pairs that are given to the reducer. Then have a pool of  
reducers that pull from the queue. It can be modeled on the multi- 
threaded map runner. Do be aware, that you'll need to clone the keys  
and values that are given to the reduce.

-- Owen

Re: Multithreaded Reducer

Posted by Todd Lipcon <to...@cloudera.com>.
On Fri, Apr 10, 2009 at 12:31 PM, jason hadoop <ja...@gmail.com>wrote:

> Hi Sagar!
>
> There is no reason for the body of your reduce method to do more than copy
> and queue the key value set into an execution pool.
>

Agreed. You probably want to use a either a bounded queue on your execution
pool, or even a SynchronousQueue to do handoff to executor threads.
Otherwise your reducer will churn through all of its inputs at IO rate,
potentially fill up RAM, and report "100%" complete way before it's actually
complete. Something like:

BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
threadPool = new ThreadPoolExecutor(WORKER_THREAD_COUNT,
WORKER_THREAD_COUNT, 5, TimeUnit.SECONDS, queue);
**

>
> The close method will need to wait until the all of the items finish
> execution and potentially keep the heartbeat up with the task tracker by
> periodically reporting something. Sadly right now the reporter has to be
> grabbed from the reduce method as configure and close do not get an
> instance.
>

+1. You probably want to call reporter.progress() after each item is
processed by the worker threads.


>
> I believe the key and value objects are reused by the framework on the next
> call to reduce, so making a copy before queuing them into your thread pool
> is important.
>

+1 here too. You will definitely run into issues if you don't make a deep
copy.

-Todd

Re: Multithreaded Reducer

Posted by jason hadoop <ja...@gmail.com>.
Hi Sagar!

There is no reason for the body of your reduce method to do more than copy
and queue the key value set into an execution pool.

The close method will need to wait until the all of the items finish
execution and potentially keep the heartbeat up with the task tracker by
periodically reporting something. Sadly right now the reporter has to be
grabbed from the reduce method as configure and close do not get an
instance.

I believe the key and value objects are reused by the framework on the next
call to reduce, so making a copy before queuing them into your thread pool
is important.


On Fri, Apr 10, 2009 at 11:12 AM, Sagar Naik <sn...@attributor.com> wrote:

> Hi,
> I would like to implement a Multi-threaded reducer.
> As per my understanding , the system does not have one coz we expect the
> output to be sorted.
>
> However, in my case I dont need the output sorted.
>
> Can u pl point to me any other issues or it would be safe to do so
>
> -Sagar
>



-- 
Alpha Chapters of my book on Hadoop are available
http://www.apress.com/book/view/9781430219422

Re: Multithreaded Reducer

Posted by Aaron Kimball <aa...@cloudera.com>.
At that level of parallelism, you're right that the process overhead would
be too high.
- Aaron


On Fri, Apr 10, 2009 at 11:36 AM, Sagar Naik <sn...@attributor.com> wrote:

>
> Two things
> - multi-threaded is preferred over multi-processes. The process I m
> planning is IO bound so I can really take advantage of  multi-threads (100
> threads)
> - Correct me if I m wrong. The next MR_JOB in the pipeline will have
>  increased number of splits to process as the number of reducer-outputs
> (from prev job) have increased . This leads to increase
>  in the map-task completion time.
>
>
>
> -Sagar
>
>
> Aaron Kimball wrote:
>
>> Rather than implementing a multi-threaded reducer, why not simply increase
>> the number of reducer tasks per machine via
>> mapred.tasktracker.reduce.tasks.maximum, and increase the total number of
>> reduce tasks per job via mapred.reduce.tasks to ensure that they're all
>> filled. This will effectively utilize a higher number of cores.
>>
>> - Aaron
>>
>> On Fri, Apr 10, 2009 at 11:12 AM, Sagar Naik <sn...@attributor.com>
>> wrote:
>>
>>
>>
>>> Hi,
>>> I would like to implement a Multi-threaded reducer.
>>> As per my understanding , the system does not have one coz we expect the
>>> output to be sorted.
>>>
>>> However, in my case I dont need the output sorted.
>>>
>>> Can u pl point to me any other issues or it would be safe to do so
>>>
>>> -Sagar
>>>
>>>
>>>
>>
>>
>>
>

Re: Multithreaded Reducer

Posted by Sagar Naik <sn...@attributor.com>.
Two things
 - multi-threaded is preferred over multi-processes. The process I m 
planning is IO bound so I can really take advantage of  multi-threads 
(100 threads)
 - Correct me if I m wrong. The next MR_JOB in the pipeline will have  
increased number of splits to process as the number of reducer-outputs 
(from prev job) have increased . This leads to increase
   in the map-task completion time.



-Sagar

Aaron Kimball wrote:
> Rather than implementing a multi-threaded reducer, why not simply increase
> the number of reducer tasks per machine via
> mapred.tasktracker.reduce.tasks.maximum, and increase the total number of
> reduce tasks per job via mapred.reduce.tasks to ensure that they're all
> filled. This will effectively utilize a higher number of cores.
>
> - Aaron
>
> On Fri, Apr 10, 2009 at 11:12 AM, Sagar Naik <sn...@attributor.com> wrote:
>
>   
>> Hi,
>> I would like to implement a Multi-threaded reducer.
>> As per my understanding , the system does not have one coz we expect the
>> output to be sorted.
>>
>> However, in my case I dont need the output sorted.
>>
>> Can u pl point to me any other issues or it would be safe to do so
>>
>> -Sagar
>>
>>     
>
>   

Re: Multithreaded Reducer

Posted by Aaron Kimball <aa...@cloudera.com>.
Rather than implementing a multi-threaded reducer, why not simply increase
the number of reducer tasks per machine via
mapred.tasktracker.reduce.tasks.maximum, and increase the total number of
reduce tasks per job via mapred.reduce.tasks to ensure that they're all
filled. This will effectively utilize a higher number of cores.

- Aaron

On Fri, Apr 10, 2009 at 11:12 AM, Sagar Naik <sn...@attributor.com> wrote:

> Hi,
> I would like to implement a Multi-threaded reducer.
> As per my understanding , the system does not have one coz we expect the
> output to be sorted.
>
> However, in my case I dont need the output sorted.
>
> Can u pl point to me any other issues or it would be safe to do so
>
> -Sagar
>