You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by 李钰 <ca...@gmail.com> on 2010/06/17 09:13:09 UTC

Performance tuning of sort

Hi all,

I'm doing some tuning of the sort benchmark of hadoop. To be more specified,
running test against the org.apache.hadoop.examples.Sort class. As looking
through the source code, I think the map tasks take responsibility of
sorting the input data, and the reduce tasks just merge the map outputs and
write them into HDFS. But here I've got a question I couldn't understand:
the time cost of the reduce phase of each reduce task, that is writing data
into HDFS, is different from each other. Since the input data and operations
of each reduce task is the same, what reason will cause the execution time
different? Is there anything wrong of my understanding? Does anybody have
any experience on this? Badly need your help, thanks.

Best Regards,
Carp

Re: Performance tuning of sort

Posted by 李钰 <ca...@gmail.com>.
Hi Jeff,

Thanks a lot for your explanation. It really helps for understanding the
details of job workflow.

Hi all,

Thanks a lot for your help. One more question, through monitoring data I
find the iowait% is quite high. Do you think this normal for there's a lot
of data read and written, as well as copy through network? The scale of the
overall input data to be sorted is 500GB, and my testing environment is a 10
nodes cluster. I've tried to increase related parameters such as
"io.sort.mb", "io.sort.factor", and used LZO compression, but seems still
couldn't reduce the iowait%. Any comments? Thanks.

Best Regards,
Carp

2010/6/18 Jeff Zhang <zj...@gmail.com>

> The scale of each reducer depends on the Partitioner. You can think of
> Partitioner as a Hash Function, and the reducer as bucket, So you can
> not expect that each bucket has same number of items.
>
> Skewed data distribution will make a few reducers cost much more time.
>
>
>
> 2010/6/18 李钰 <ca...@gmail.com>:
>  > Hi Jeff and Amogh,
> >
> > Thanks for your comments! In my understanding, in the partitioning phase
> > before spilling to disk, the threads will divide the data into partitions
> > corresponding to the number of reducers, as described int the Definitive
> > Guide. So I think the scale of input data should be the same for each
> > reducer. I wonder if I have any misunderstanding about this, please
> correct
> > me if you find any faults, thanks.
> >
> > As to the reduce phases, I did check the time of shuffle, sort and reduce
> > through the JT UI, but found it quite different for each reduce task.
> Some
> > task may have longer shuffle time but less reduce time, while some may
> have
> > less shuffle time but longer reduce time. I set the reducer number large
> > enough to let all reduce tasks run in parallel, and set
> > "mapred.reduce.slowstart.completed.maps" parameter to 1.0 to let them
> start
> > at the same time when all map tasks have been finished, and I think this
> may
> > reduce the impact of network and time cost of waiting for map task to
> finish
> > during the shuffle phase. Then why still got quite different time spent
> in
> > shuffle? And since the reduce phase of reduce is just writing sorted data
> > into HDFS, why the time of reduce phase is different?
> >
> > Anything wrong with my analyzing? Any suggestions? Thanks a lot.
> >
> > Dear all,
> >
> > Any other comments? Thanks.
> >
> > Best Regards,
> > Carp
> >
> >
> > 在 2010年6月18日 上午11:39,Amogh Vasekar <am...@yahoo-inc.com>写道:
> >
> >>
> >> >>Since the scale of input data and operations of each reduce task is
> the
> >> same, what may cause the execution time of reduce tasks different?
> >>
> >> You should consider looking at the copy, shuffle and reduce times
> >> separately from JT UI to get better info. Many (dynamic) considerations
> like
> >> network congestion, number of mappers reducer is fetching from, data
> skew
> >> wrt input keys to reducer etc will affect this number.
> >>
> >> HTH,
> >> Amogh
> >>
> >> On 6/18/10 8:05 AM, "李钰" <ca...@gmail.com> wrote:
> >>
> >> Hi Todd and Jeff,
> >>
> >> Thanks a lot for your discussion, it's really helpful to me. I'd like to
> >> express my especial appreciation for Todd's patient explanation, you
> help
> >> me
> >> see more clearly about the working mechanism of SORT. And Jeff, really
> >> thank
> >> you for reminding me that sort uses TotalOrderPartitioner to do
> >> partitioning.
> >> Based on your discussion I update my understanding as follows:
> >> The sorting happens on the map side during the spill process of each map
> >> task, after that, the overall map outputs are partitioned by method of
> >> TotalOrderPartitioner, this decides the input range of each reducer.
> >> Reducers get map outputs as decided by the partitioner, and do merging
> and
> >> write results into HDFS.
> >> Is this understanding right? Please correct me if you find any faults,
> >> thanks.
> >> If this understanding is right, then my question rolls back to the
> original
> >> one: Since the scale of input data and operations of each reduce task is
> >> the
> >> same, what may cause the execution time of reduce tasks different? All
> >> nodes
> >> used in my experiment are on the same rack, and they are homogenous.
> >> Any suggesion will be highly appreciated, thanks.
> >>
> >> Best Regards,
> >> Carp
> >>
> >> 2010/6/18 Todd Lipcon <to...@cloudera.com>
> >>
> >> > On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zj...@gmail.com> wrote:
> >> >
> >> > > Todd,
> >> > >
> >> > > Why's there a sorting in map task, the sorting here seems useless in
> my
> >> > > opinion.
> >> > >
> >> > >
> >> > For map-only jobs there isn't. For jobs with reduce, typically the
> number
> >> > of
> >> > reduce tasks is smaller than the number of map tasks, so parallelizing
> >> the
> >> > sort on the mappers and just doing merge on the reducers is
> beneficial.
> >> > Second, this allows the combiner to run on the mapper by identifying
> when
> >> > it
> >> > has multiple outputs for the same key. Third, this allows improved
> >> > compression on the map output (thus less intermediate data transfer)
> by
> >> > putting similar keys near each other (hopefully within the compression
> >> > window). Fourth, it kills two birds with one stone since the mappers
> >> > already
> >> > have to group outputs by the partition.
> >> >
> >> > -Todd
> >> >
> >> >
> >> > >
> >> > >
> >> > > On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com>
> >> wrote:
> >> > > > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com>
> >> wrote:
> >> > > >
> >> > > >> Your understanding of Sort is not right. The key concept of Sort
> is
> >> > > >> the TotalOrderPartitioner. Actually before the map-reduce job,
> >> client
> >> > > >> side will do sampling of input data to estimate the distribution
> of
> >> > > >> input data. And the mapper do nothing, each reducer will fetch
> its
> >> > > >> data according the TotalOrderPartitioner. The data in each
> reducer
> >> is
> >> > > >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
> >> > > >> overall result data is sorted.
> >> > > >>
> >> > > >
> >> > > > The sorting happens on the map side, actually, during the spill
> >> > process.
> >> > > The
> >> > > > mapper itself is an identity function, but the map task code does
> >> > perform
> >> > > a
> >> > > > sort (on a <partition,key> tuple) as originally described in this
> >> > thread.
> >> > > > Reducers just do a merge of mapper outputs.
> >> > > >
> >> > > > -Todd
> >> > > >
> >> > > >
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> >> > > >> > Hi all,
> >> > > >> >
> >> > > >> > I'm doing some tuning of the sort benchmark of hadoop. To be
> more
> >> > > >> specified,
> >> > > >> > running test against the org.apache.hadoop.examples.Sort class.
> As
> >> > > >> looking
> >> > > >> > through the source code, I think the map tasks take
> responsibility
> >> > of
> >> > > >> > sorting the input data, and the reduce tasks just merge the map
> >> > > outputs
> >> > > >> and
> >> > > >> > write them into HDFS. But here I've got a question I couldn't
> >> > > understand:
> >> > > >> > the time cost of the reduce phase of each reduce task, that is
> >> > writing
> >> > > >> data
> >> > > >> > into HDFS, is different from each other. Since the input data
> and
> >> > > >> operations
> >> > > >> > of each reduce task is the same, what reason will cause the
> >> > execution
> >> > > >> time
> >> > > >> > different? Is there anything wrong of my understanding? Does
> >> anybody
> >> > > have
> >> > > >> > any experience on this? Badly need your help, thanks.
> >> > > >> >
> >> > > >> > Best Regards,
> >> > > >> > Carp
> >> > > >> >
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> --
> >> > > >> Best Regards
> >> > > >>
> >> > > >> Jeff Zhang
> >> > > >>
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Todd Lipcon
> >> > > > Software Engineer, Cloudera
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > Best Regards
> >> > >
> >> > > Jeff Zhang
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> >  Todd Lipcon
> >> > Software Engineer, Cloudera
> >> >
> >>
> >>
> >
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Performance tuning of sort

Posted by Jeff Zhang <zj...@gmail.com>.
The scale of each reducer depends on the Partitioner. You can think of
Partitioner as a Hash Function, and the reducer as bucket, So you can
not expect that each bucket has same number of items.

Skewed data distribution will make a few reducers cost much more time.



2010/6/18 李钰 <ca...@gmail.com>:
> Hi Jeff and Amogh,
>
> Thanks for your comments! In my understanding, in the partitioning phase
> before spilling to disk, the threads will divide the data into partitions
> corresponding to the number of reducers, as described int the Definitive
> Guide. So I think the scale of input data should be the same for each
> reducer. I wonder if I have any misunderstanding about this, please correct
> me if you find any faults, thanks.
>
> As to the reduce phases, I did check the time of shuffle, sort and reduce
> through the JT UI, but found it quite different for each reduce task. Some
> task may have longer shuffle time but less reduce time, while some may have
> less shuffle time but longer reduce time. I set the reducer number large
> enough to let all reduce tasks run in parallel, and set
> "mapred.reduce.slowstart.completed.maps" parameter to 1.0 to let them start
> at the same time when all map tasks have been finished, and I think this may
> reduce the impact of network and time cost of waiting for map task to finish
> during the shuffle phase. Then why still got quite different time spent in
> shuffle? And since the reduce phase of reduce is just writing sorted data
> into HDFS, why the time of reduce phase is different?
>
> Anything wrong with my analyzing? Any suggestions? Thanks a lot.
>
> Dear all,
>
> Any other comments? Thanks.
>
> Best Regards,
> Carp
>
>
> 在 2010年6月18日 上午11:39,Amogh Vasekar <am...@yahoo-inc.com>写道:
>
>>
>> >>Since the scale of input data and operations of each reduce task is the
>> same, what may cause the execution time of reduce tasks different?
>>
>> You should consider looking at the copy, shuffle and reduce times
>> separately from JT UI to get better info. Many (dynamic) considerations like
>> network congestion, number of mappers reducer is fetching from, data skew
>> wrt input keys to reducer etc will affect this number.
>>
>> HTH,
>> Amogh
>>
>> On 6/18/10 8:05 AM, "李钰" <ca...@gmail.com> wrote:
>>
>> Hi Todd and Jeff,
>>
>> Thanks a lot for your discussion, it's really helpful to me. I'd like to
>> express my especial appreciation for Todd's patient explanation, you help
>> me
>> see more clearly about the working mechanism of SORT. And Jeff, really
>> thank
>> you for reminding me that sort uses TotalOrderPartitioner to do
>> partitioning.
>> Based on your discussion I update my understanding as follows:
>> The sorting happens on the map side during the spill process of each map
>> task, after that, the overall map outputs are partitioned by method of
>> TotalOrderPartitioner, this decides the input range of each reducer.
>> Reducers get map outputs as decided by the partitioner, and do merging and
>> write results into HDFS.
>> Is this understanding right? Please correct me if you find any faults,
>> thanks.
>> If this understanding is right, then my question rolls back to the original
>> one: Since the scale of input data and operations of each reduce task is
>> the
>> same, what may cause the execution time of reduce tasks different? All
>> nodes
>> used in my experiment are on the same rack, and they are homogenous.
>> Any suggesion will be highly appreciated, thanks.
>>
>> Best Regards,
>> Carp
>>
>> 2010/6/18 Todd Lipcon <to...@cloudera.com>
>>
>> > On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zj...@gmail.com> wrote:
>> >
>> > > Todd,
>> > >
>> > > Why's there a sorting in map task, the sorting here seems useless in my
>> > > opinion.
>> > >
>> > >
>> > For map-only jobs there isn't. For jobs with reduce, typically the number
>> > of
>> > reduce tasks is smaller than the number of map tasks, so parallelizing
>> the
>> > sort on the mappers and just doing merge on the reducers is beneficial.
>> > Second, this allows the combiner to run on the mapper by identifying when
>> > it
>> > has multiple outputs for the same key. Third, this allows improved
>> > compression on the map output (thus less intermediate data transfer) by
>> > putting similar keys near each other (hopefully within the compression
>> > window). Fourth, it kills two birds with one stone since the mappers
>> > already
>> > have to group outputs by the partition.
>> >
>> > -Todd
>> >
>> >
>> > >
>> > >
>> > > On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com>
>> wrote:
>> > > > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com>
>> wrote:
>> > > >
>> > > >> Your understanding of Sort is not right. The key concept of Sort is
>> > > >> the TotalOrderPartitioner. Actually before the map-reduce job,
>> client
>> > > >> side will do sampling of input data to estimate the distribution of
>> > > >> input data. And the mapper do nothing, each reducer will fetch its
>> > > >> data according the TotalOrderPartitioner. The data in each reducer
>> is
>> > > >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
>> > > >> overall result data is sorted.
>> > > >>
>> > > >
>> > > > The sorting happens on the map side, actually, during the spill
>> > process.
>> > > The
>> > > > mapper itself is an identity function, but the map task code does
>> > perform
>> > > a
>> > > > sort (on a <partition,key> tuple) as originally described in this
>> > thread.
>> > > > Reducers just do a merge of mapper outputs.
>> > > >
>> > > > -Todd
>> > > >
>> > > >
>> > > >>
>> > > >>
>> > > >>
>> > > >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
>> > > >> > Hi all,
>> > > >> >
>> > > >> > I'm doing some tuning of the sort benchmark of hadoop. To be more
>> > > >> specified,
>> > > >> > running test against the org.apache.hadoop.examples.Sort class. As
>> > > >> looking
>> > > >> > through the source code, I think the map tasks take responsibility
>> > of
>> > > >> > sorting the input data, and the reduce tasks just merge the map
>> > > outputs
>> > > >> and
>> > > >> > write them into HDFS. But here I've got a question I couldn't
>> > > understand:
>> > > >> > the time cost of the reduce phase of each reduce task, that is
>> > writing
>> > > >> data
>> > > >> > into HDFS, is different from each other. Since the input data and
>> > > >> operations
>> > > >> > of each reduce task is the same, what reason will cause the
>> > execution
>> > > >> time
>> > > >> > different? Is there anything wrong of my understanding? Does
>> anybody
>> > > have
>> > > >> > any experience on this? Badly need your help, thanks.
>> > > >> >
>> > > >> > Best Regards,
>> > > >> > Carp
>> > > >> >
>> > > >>
>> > > >>
>> > > >>
>> > > >> --
>> > > >> Best Regards
>> > > >>
>> > > >> Jeff Zhang
>> > > >>
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Todd Lipcon
>> > > > Software Engineer, Cloudera
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > Best Regards
>> > >
>> > > Jeff Zhang
>> > >
>> >
>> >
>> >
>> > --
>> >  Todd Lipcon
>> > Software Engineer, Cloudera
>> >
>>
>>
>



-- 
Best Regards

Jeff Zhang

Re: Performance tuning of sort

Posted by 李钰 <ca...@gmail.com>.
Hi Jeff and Amogh,

Thanks for your comments! In my understanding, in the partitioning phase
before spilling to disk, the threads will divide the data into partitions
corresponding to the number of reducers, as described int the Definitive
Guide. So I think the scale of input data should be the same for each
reducer. I wonder if I have any misunderstanding about this, please correct
me if you find any faults, thanks.

As to the reduce phases, I did check the time of shuffle, sort and reduce
through the JT UI, but found it quite different for each reduce task. Some
task may have longer shuffle time but less reduce time, while some may have
less shuffle time but longer reduce time. I set the reducer number large
enough to let all reduce tasks run in parallel, and set
"mapred.reduce.slowstart.completed.maps" parameter to 1.0 to let them start
at the same time when all map tasks have been finished, and I think this may
reduce the impact of network and time cost of waiting for map task to finish
during the shuffle phase. Then why still got quite different time spent in
shuffle? And since the reduce phase of reduce is just writing sorted data
into HDFS, why the time of reduce phase is different?

Anything wrong with my analyzing? Any suggestions? Thanks a lot.

Dear all,

Any other comments? Thanks.

Best Regards,
Carp


在 2010年6月18日 上午11:39,Amogh Vasekar <am...@yahoo-inc.com>写道:

>
> >>Since the scale of input data and operations of each reduce task is the
> same, what may cause the execution time of reduce tasks different?
>
> You should consider looking at the copy, shuffle and reduce times
> separately from JT UI to get better info. Many (dynamic) considerations like
> network congestion, number of mappers reducer is fetching from, data skew
> wrt input keys to reducer etc will affect this number.
>
> HTH,
> Amogh
>
> On 6/18/10 8:05 AM, "李钰" <ca...@gmail.com> wrote:
>
> Hi Todd and Jeff,
>
> Thanks a lot for your discussion, it's really helpful to me. I'd like to
> express my especial appreciation for Todd's patient explanation, you help
> me
> see more clearly about the working mechanism of SORT. And Jeff, really
> thank
> you for reminding me that sort uses TotalOrderPartitioner to do
> partitioning.
> Based on your discussion I update my understanding as follows:
> The sorting happens on the map side during the spill process of each map
> task, after that, the overall map outputs are partitioned by method of
> TotalOrderPartitioner, this decides the input range of each reducer.
> Reducers get map outputs as decided by the partitioner, and do merging and
> write results into HDFS.
> Is this understanding right? Please correct me if you find any faults,
> thanks.
> If this understanding is right, then my question rolls back to the original
> one: Since the scale of input data and operations of each reduce task is
> the
> same, what may cause the execution time of reduce tasks different? All
> nodes
> used in my experiment are on the same rack, and they are homogenous.
> Any suggesion will be highly appreciated, thanks.
>
> Best Regards,
> Carp
>
> 2010/6/18 Todd Lipcon <to...@cloudera.com>
>
> > On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zj...@gmail.com> wrote:
> >
> > > Todd,
> > >
> > > Why's there a sorting in map task, the sorting here seems useless in my
> > > opinion.
> > >
> > >
> > For map-only jobs there isn't. For jobs with reduce, typically the number
> > of
> > reduce tasks is smaller than the number of map tasks, so parallelizing
> the
> > sort on the mappers and just doing merge on the reducers is beneficial.
> > Second, this allows the combiner to run on the mapper by identifying when
> > it
> > has multiple outputs for the same key. Third, this allows improved
> > compression on the map output (thus less intermediate data transfer) by
> > putting similar keys near each other (hopefully within the compression
> > window). Fourth, it kills two birds with one stone since the mappers
> > already
> > have to group outputs by the partition.
> >
> > -Todd
> >
> >
> > >
> > >
> > > On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com>
> wrote:
> > > > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com>
> wrote:
> > > >
> > > >> Your understanding of Sort is not right. The key concept of Sort is
> > > >> the TotalOrderPartitioner. Actually before the map-reduce job,
> client
> > > >> side will do sampling of input data to estimate the distribution of
> > > >> input data. And the mapper do nothing, each reducer will fetch its
> > > >> data according the TotalOrderPartitioner. The data in each reducer
> is
> > > >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
> > > >> overall result data is sorted.
> > > >>
> > > >
> > > > The sorting happens on the map side, actually, during the spill
> > process.
> > > The
> > > > mapper itself is an identity function, but the map task code does
> > perform
> > > a
> > > > sort (on a <partition,key> tuple) as originally described in this
> > thread.
> > > > Reducers just do a merge of mapper outputs.
> > > >
> > > > -Todd
> > > >
> > > >
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> > > >> > Hi all,
> > > >> >
> > > >> > I'm doing some tuning of the sort benchmark of hadoop. To be more
> > > >> specified,
> > > >> > running test against the org.apache.hadoop.examples.Sort class. As
> > > >> looking
> > > >> > through the source code, I think the map tasks take responsibility
> > of
> > > >> > sorting the input data, and the reduce tasks just merge the map
> > > outputs
> > > >> and
> > > >> > write them into HDFS. But here I've got a question I couldn't
> > > understand:
> > > >> > the time cost of the reduce phase of each reduce task, that is
> > writing
> > > >> data
> > > >> > into HDFS, is different from each other. Since the input data and
> > > >> operations
> > > >> > of each reduce task is the same, what reason will cause the
> > execution
> > > >> time
> > > >> > different? Is there anything wrong of my understanding? Does
> anybody
> > > have
> > > >> > any experience on this? Badly need your help, thanks.
> > > >> >
> > > >> > Best Regards,
> > > >> > Carp
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Best Regards
> > > >>
> > > >> Jeff Zhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Todd Lipcon
> > > > Software Engineer, Cloudera
> > > >
> > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
> >
> >
> > --
> >  Todd Lipcon
> > Software Engineer, Cloudera
> >
>
>

Re: Performance tuning of sort

Posted by Jeff Zhang <zj...@gmail.com>.
The input of each reducer is not same, it depends on the input data
distribution and Partitioner.
And the running time of each reducer consist of three phases: copy,
sort and reducer.


2010/6/18 李钰 <ca...@gmail.com>:
> Hi Todd and Jeff,
>
> Thanks a lot for your discussion, it's really helpful to me. I'd like to
> express my especial appreciation for Todd's patient explanation, you help me
> see more clearly about the working mechanism of SORT. And Jeff, really thank
> you for reminding me that sort uses TotalOrderPartitioner to do
> partitioning.
> Based on your discussion I update my understanding as follows:
> The sorting happens on the map side during the spill process of each map
> task, after that, the overall map outputs are partitioned by method of
> TotalOrderPartitioner, this decides the input range of each reducer.
> Reducers get map outputs as decided by the partitioner, and do merging and
> write results into HDFS.
> Is this understanding right? Please correct me if you find any faults,
> thanks.
> If this understanding is right, then my question rolls back to the original
> one: Since the scale of input data and operations of each reduce task is the
> same, what may cause the execution time of reduce tasks different? All nodes
> used in my experiment are on the same rack, and they are homogenous.
> Any suggesion will be highly appreciated, thanks.
>
> Best Regards,
> Carp
>
> 2010/6/18 Todd Lipcon <to...@cloudera.com>
>
>> On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zj...@gmail.com> wrote:
>>
>> > Todd,
>> >
>> > Why's there a sorting in map task, the sorting here seems useless in my
>> > opinion.
>> >
>> >
>> For map-only jobs there isn't. For jobs with reduce, typically the number
>> of
>> reduce tasks is smaller than the number of map tasks, so parallelizing the
>> sort on the mappers and just doing merge on the reducers is beneficial.
>> Second, this allows the combiner to run on the mapper by identifying when
>> it
>> has multiple outputs for the same key. Third, this allows improved
>> compression on the map output (thus less intermediate data transfer) by
>> putting similar keys near each other (hopefully within the compression
>> window). Fourth, it kills two birds with one stone since the mappers
>> already
>> have to group outputs by the partition.
>>
>> -Todd
>>
>>
>> >
>> >
>> > On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com> wrote:
>> > > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com> wrote:
>> > >
>> > >> Your understanding of Sort is not right. The key concept of Sort is
>> > >> the TotalOrderPartitioner. Actually before the map-reduce job, client
>> > >> side will do sampling of input data to estimate the distribution of
>> > >> input data. And the mapper do nothing, each reducer will fetch its
>> > >> data according the TotalOrderPartitioner. The data in each reducer is
>> > >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
>> > >> overall result data is sorted.
>> > >>
>> > >
>> > > The sorting happens on the map side, actually, during the spill
>> process.
>> > The
>> > > mapper itself is an identity function, but the map task code does
>> perform
>> > a
>> > > sort (on a <partition,key> tuple) as originally described in this
>> thread.
>> > > Reducers just do a merge of mapper outputs.
>> > >
>> > > -Todd
>> > >
>> > >
>> > >>
>> > >>
>> > >>
>> > >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
>> > >> > Hi all,
>> > >> >
>> > >> > I'm doing some tuning of the sort benchmark of hadoop. To be more
>> > >> specified,
>> > >> > running test against the org.apache.hadoop.examples.Sort class. As
>> > >> looking
>> > >> > through the source code, I think the map tasks take responsibility
>> of
>> > >> > sorting the input data, and the reduce tasks just merge the map
>> > outputs
>> > >> and
>> > >> > write them into HDFS. But here I've got a question I couldn't
>> > understand:
>> > >> > the time cost of the reduce phase of each reduce task, that is
>> writing
>> > >> data
>> > >> > into HDFS, is different from each other. Since the input data and
>> > >> operations
>> > >> > of each reduce task is the same, what reason will cause the
>> execution
>> > >> time
>> > >> > different? Is there anything wrong of my understanding? Does anybody
>> > have
>> > >> > any experience on this? Badly need your help, thanks.
>> > >> >
>> > >> > Best Regards,
>> > >> > Carp
>> > >> >
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> Best Regards
>> > >>
>> > >> Jeff Zhang
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Todd Lipcon
>> > > Software Engineer, Cloudera
>> > >
>> >
>> >
>> >
>> > --
>> > Best Regards
>> >
>> > Jeff Zhang
>> >
>>
>>
>>
>> --
>>  Todd Lipcon
>> Software Engineer, Cloudera
>>
>



-- 
Best Regards

Jeff Zhang

Re: Performance tuning of sort

Posted by Amogh Vasekar <am...@yahoo-inc.com>.
>>Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different?

You should consider looking at the copy, shuffle and reduce times separately from JT UI to get better info. Many (dynamic) considerations like network congestion, number of mappers reducer is fetching from, data skew wrt input keys to reducer etc will affect this number.

HTH,
Amogh

On 6/18/10 8:05 AM, "李钰" <ca...@gmail.com> wrote:

Hi Todd and Jeff,

Thanks a lot for your discussion, it's really helpful to me. I'd like to
express my especial appreciation for Todd's patient explanation, you help me
see more clearly about the working mechanism of SORT. And Jeff, really thank
you for reminding me that sort uses TotalOrderPartitioner to do
partitioning.
Based on your discussion I update my understanding as follows:
The sorting happens on the map side during the spill process of each map
task, after that, the overall map outputs are partitioned by method of
TotalOrderPartitioner, this decides the input range of each reducer.
Reducers get map outputs as decided by the partitioner, and do merging and
write results into HDFS.
Is this understanding right? Please correct me if you find any faults,
thanks.
If this understanding is right, then my question rolls back to the original
one: Since the scale of input data and operations of each reduce task is the
same, what may cause the execution time of reduce tasks different? All nodes
used in my experiment are on the same rack, and they are homogenous.
Any suggesion will be highly appreciated, thanks.

Best Regards,
Carp

2010/6/18 Todd Lipcon <to...@cloudera.com>

> On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zj...@gmail.com> wrote:
>
> > Todd,
> >
> > Why's there a sorting in map task, the sorting here seems useless in my
> > opinion.
> >
> >
> For map-only jobs there isn't. For jobs with reduce, typically the number
> of
> reduce tasks is smaller than the number of map tasks, so parallelizing the
> sort on the mappers and just doing merge on the reducers is beneficial.
> Second, this allows the combiner to run on the mapper by identifying when
> it
> has multiple outputs for the same key. Third, this allows improved
> compression on the map output (thus less intermediate data transfer) by
> putting similar keys near each other (hopefully within the compression
> window). Fourth, it kills two birds with one stone since the mappers
> already
> have to group outputs by the partition.
>
> -Todd
>
>
> >
> >
> > On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com> wrote:
> > > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com> wrote:
> > >
> > >> Your understanding of Sort is not right. The key concept of Sort is
> > >> the TotalOrderPartitioner. Actually before the map-reduce job, client
> > >> side will do sampling of input data to estimate the distribution of
> > >> input data. And the mapper do nothing, each reducer will fetch its
> > >> data according the TotalOrderPartitioner. The data in each reducer is
> > >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
> > >> overall result data is sorted.
> > >>
> > >
> > > The sorting happens on the map side, actually, during the spill
> process.
> > The
> > > mapper itself is an identity function, but the map task code does
> perform
> > a
> > > sort (on a <partition,key> tuple) as originally described in this
> thread.
> > > Reducers just do a merge of mapper outputs.
> > >
> > > -Todd
> > >
> > >
> > >>
> > >>
> > >>
> > >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> > >> > Hi all,
> > >> >
> > >> > I'm doing some tuning of the sort benchmark of hadoop. To be more
> > >> specified,
> > >> > running test against the org.apache.hadoop.examples.Sort class. As
> > >> looking
> > >> > through the source code, I think the map tasks take responsibility
> of
> > >> > sorting the input data, and the reduce tasks just merge the map
> > outputs
> > >> and
> > >> > write them into HDFS. But here I've got a question I couldn't
> > understand:
> > >> > the time cost of the reduce phase of each reduce task, that is
> writing
> > >> data
> > >> > into HDFS, is different from each other. Since the input data and
> > >> operations
> > >> > of each reduce task is the same, what reason will cause the
> execution
> > >> time
> > >> > different? Is there anything wrong of my understanding? Does anybody
> > have
> > >> > any experience on this? Badly need your help, thanks.
> > >> >
> > >> > Best Regards,
> > >> > Carp
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Best Regards
> > >>
> > >> Jeff Zhang
> > >>
> > >
> > >
> > >
> > > --
> > > Todd Lipcon
> > > Software Engineer, Cloudera
> > >
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>
>
>
> --
>  Todd Lipcon
> Software Engineer, Cloudera
>


Re: Performance tuning of sort

Posted by 李钰 <ca...@gmail.com>.
Hi Todd and Jeff,

Thanks a lot for your discussion, it's really helpful to me. I'd like to
express my especial appreciation for Todd's patient explanation, you help me
see more clearly about the working mechanism of SORT. And Jeff, really thank
you for reminding me that sort uses TotalOrderPartitioner to do
partitioning.
Based on your discussion I update my understanding as follows:
The sorting happens on the map side during the spill process of each map
task, after that, the overall map outputs are partitioned by method of
TotalOrderPartitioner, this decides the input range of each reducer.
Reducers get map outputs as decided by the partitioner, and do merging and
write results into HDFS.
Is this understanding right? Please correct me if you find any faults,
thanks.
If this understanding is right, then my question rolls back to the original
one: Since the scale of input data and operations of each reduce task is the
same, what may cause the execution time of reduce tasks different? All nodes
used in my experiment are on the same rack, and they are homogenous.
Any suggesion will be highly appreciated, thanks.

Best Regards,
Carp

2010/6/18 Todd Lipcon <to...@cloudera.com>

> On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zj...@gmail.com> wrote:
>
> > Todd,
> >
> > Why's there a sorting in map task, the sorting here seems useless in my
> > opinion.
> >
> >
> For map-only jobs there isn't. For jobs with reduce, typically the number
> of
> reduce tasks is smaller than the number of map tasks, so parallelizing the
> sort on the mappers and just doing merge on the reducers is beneficial.
> Second, this allows the combiner to run on the mapper by identifying when
> it
> has multiple outputs for the same key. Third, this allows improved
> compression on the map output (thus less intermediate data transfer) by
> putting similar keys near each other (hopefully within the compression
> window). Fourth, it kills two birds with one stone since the mappers
> already
> have to group outputs by the partition.
>
> -Todd
>
>
> >
> >
> > On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com> wrote:
> > > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com> wrote:
> > >
> > >> Your understanding of Sort is not right. The key concept of Sort is
> > >> the TotalOrderPartitioner. Actually before the map-reduce job, client
> > >> side will do sampling of input data to estimate the distribution of
> > >> input data. And the mapper do nothing, each reducer will fetch its
> > >> data according the TotalOrderPartitioner. The data in each reducer is
> > >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
> > >> overall result data is sorted.
> > >>
> > >
> > > The sorting happens on the map side, actually, during the spill
> process.
> > The
> > > mapper itself is an identity function, but the map task code does
> perform
> > a
> > > sort (on a <partition,key> tuple) as originally described in this
> thread.
> > > Reducers just do a merge of mapper outputs.
> > >
> > > -Todd
> > >
> > >
> > >>
> > >>
> > >>
> > >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> > >> > Hi all,
> > >> >
> > >> > I'm doing some tuning of the sort benchmark of hadoop. To be more
> > >> specified,
> > >> > running test against the org.apache.hadoop.examples.Sort class. As
> > >> looking
> > >> > through the source code, I think the map tasks take responsibility
> of
> > >> > sorting the input data, and the reduce tasks just merge the map
> > outputs
> > >> and
> > >> > write them into HDFS. But here I've got a question I couldn't
> > understand:
> > >> > the time cost of the reduce phase of each reduce task, that is
> writing
> > >> data
> > >> > into HDFS, is different from each other. Since the input data and
> > >> operations
> > >> > of each reduce task is the same, what reason will cause the
> execution
> > >> time
> > >> > different? Is there anything wrong of my understanding? Does anybody
> > have
> > >> > any experience on this? Badly need your help, thanks.
> > >> >
> > >> > Best Regards,
> > >> > Carp
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Best Regards
> > >>
> > >> Jeff Zhang
> > >>
> > >
> > >
> > >
> > > --
> > > Todd Lipcon
> > > Software Engineer, Cloudera
> > >
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>
>
>
> --
>  Todd Lipcon
> Software Engineer, Cloudera
>

Re: Performance tuning of sort

Posted by Todd Lipcon <to...@cloudera.com>.
On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zj...@gmail.com> wrote:

> Todd,
>
> Why's there a sorting in map task, the sorting here seems useless in my
> opinion.
>
>
For map-only jobs there isn't. For jobs with reduce, typically the number of
reduce tasks is smaller than the number of map tasks, so parallelizing the
sort on the mappers and just doing merge on the reducers is beneficial.
Second, this allows the combiner to run on the mapper by identifying when it
has multiple outputs for the same key. Third, this allows improved
compression on the map output (thus less intermediate data transfer) by
putting similar keys near each other (hopefully within the compression
window). Fourth, it kills two birds with one stone since the mappers already
have to group outputs by the partition.

-Todd


>
>
> On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com> wrote:
> > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com> wrote:
> >
> >> Your understanding of Sort is not right. The key concept of Sort is
> >> the TotalOrderPartitioner. Actually before the map-reduce job, client
> >> side will do sampling of input data to estimate the distribution of
> >> input data. And the mapper do nothing, each reducer will fetch its
> >> data according the TotalOrderPartitioner. The data in each reducer is
> >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
> >> overall result data is sorted.
> >>
> >
> > The sorting happens on the map side, actually, during the spill process.
> The
> > mapper itself is an identity function, but the map task code does perform
> a
> > sort (on a <partition,key> tuple) as originally described in this thread.
> > Reducers just do a merge of mapper outputs.
> >
> > -Todd
> >
> >
> >>
> >>
> >>
> >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> >> > Hi all,
> >> >
> >> > I'm doing some tuning of the sort benchmark of hadoop. To be more
> >> specified,
> >> > running test against the org.apache.hadoop.examples.Sort class. As
> >> looking
> >> > through the source code, I think the map tasks take responsibility of
> >> > sorting the input data, and the reduce tasks just merge the map
> outputs
> >> and
> >> > write them into HDFS. But here I've got a question I couldn't
> understand:
> >> > the time cost of the reduce phase of each reduce task, that is writing
> >> data
> >> > into HDFS, is different from each other. Since the input data and
> >> operations
> >> > of each reduce task is the same, what reason will cause the execution
> >> time
> >> > different? Is there anything wrong of my understanding? Does anybody
> have
> >> > any experience on this? Badly need your help, thanks.
> >> >
> >> > Best Regards,
> >> > Carp
> >> >
> >>
> >>
> >>
> >> --
> >> Best Regards
> >>
> >> Jeff Zhang
> >>
> >
> >
> >
> > --
> > Todd Lipcon
> > Software Engineer, Cloudera
> >
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Todd Lipcon
Software Engineer, Cloudera

Re: Performance tuning of sort

Posted by Jeff Zhang <zj...@gmail.com>.
Todd,

Why's there a sorting in map task, the sorting here seems useless in my opinion.



On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <to...@cloudera.com> wrote:
> On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com> wrote:
>
>> Your understanding of Sort is not right. The key concept of Sort is
>> the TotalOrderPartitioner. Actually before the map-reduce job, client
>> side will do sampling of input data to estimate the distribution of
>> input data. And the mapper do nothing, each reducer will fetch its
>> data according the TotalOrderPartitioner. The data in each reducer is
>> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
>> overall result data is sorted.
>>
>
> The sorting happens on the map side, actually, during the spill process. The
> mapper itself is an identity function, but the map task code does perform a
> sort (on a <partition,key> tuple) as originally described in this thread.
> Reducers just do a merge of mapper outputs.
>
> -Todd
>
>
>>
>>
>>
>> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
>> > Hi all,
>> >
>> > I'm doing some tuning of the sort benchmark of hadoop. To be more
>> specified,
>> > running test against the org.apache.hadoop.examples.Sort class. As
>> looking
>> > through the source code, I think the map tasks take responsibility of
>> > sorting the input data, and the reduce tasks just merge the map outputs
>> and
>> > write them into HDFS. But here I've got a question I couldn't understand:
>> > the time cost of the reduce phase of each reduce task, that is writing
>> data
>> > into HDFS, is different from each other. Since the input data and
>> operations
>> > of each reduce task is the same, what reason will cause the execution
>> time
>> > different? Is there anything wrong of my understanding? Does anybody have
>> > any experience on this? Badly need your help, thanks.
>> >
>> > Best Regards,
>> > Carp
>> >
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Todd Lipcon
> Software Engineer, Cloudera
>



-- 
Best Regards

Jeff Zhang

Re: Performance tuning of sort

Posted by Todd Lipcon <to...@cloudera.com>.
On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zj...@gmail.com> wrote:

> Your understanding of Sort is not right. The key concept of Sort is
> the TotalOrderPartitioner. Actually before the map-reduce job, client
> side will do sampling of input data to estimate the distribution of
> input data. And the mapper do nothing, each reducer will fetch its
> data according the TotalOrderPartitioner. The data in each reducer is
> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
> overall result data is sorted.
>

The sorting happens on the map side, actually, during the spill process. The
mapper itself is an identity function, but the map task code does perform a
sort (on a <partition,key> tuple) as originally described in this thread.
Reducers just do a merge of mapper outputs.

-Todd


>
>
>
> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> > Hi all,
> >
> > I'm doing some tuning of the sort benchmark of hadoop. To be more
> specified,
> > running test against the org.apache.hadoop.examples.Sort class. As
> looking
> > through the source code, I think the map tasks take responsibility of
> > sorting the input data, and the reduce tasks just merge the map outputs
> and
> > write them into HDFS. But here I've got a question I couldn't understand:
> > the time cost of the reduce phase of each reduce task, that is writing
> data
> > into HDFS, is different from each other. Since the input data and
> operations
> > of each reduce task is the same, what reason will cause the execution
> time
> > different? Is there anything wrong of my understanding? Does anybody have
> > any experience on this? Badly need your help, thanks.
> >
> > Best Regards,
> > Carp
> >
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Todd Lipcon
Software Engineer, Cloudera

Re: Performance tuning of sort

Posted by 李钰 <ca...@gmail.com>.
Hi Jeff,

Really thank you for your reply. It really helps! I'll take a look at
TotalOrderPartitioner carefully.
BTW, what's your opinion of where the bottleneck lies in SORT, and which
parameters impact the performance of SORT most? Looking forward to your
reply, thanks.

Dear all,

Any other comments? Thanks.

Best Regards,
Carp

2010/6/17 Jeff Zhang <zj...@gmail.com>

> Your understanding of Sort is not right. The key concept of Sort is
> the TotalOrderPartitioner. Actually before the map-reduce job, client
> side will do sampling of input data to estimate the distribution of
> input data. And the mapper do nothing, each reducer will fetch its
> data according the TotalOrderPartitioner. The data in each reducer is
> local sorted, and each reducer are sorted ( r0<r1<r2....), so the
> overall result data is sorted.
>
>
>
> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> > Hi all,
> >
> > I'm doing some tuning of the sort benchmark of hadoop. To be more
> specified,
> > running test against the org.apache.hadoop.examples.Sort class. As
> looking
> > through the source code, I think the map tasks take responsibility of
> > sorting the input data, and the reduce tasks just merge the map outputs
> and
> > write them into HDFS. But here I've got a question I couldn't understand:
> > the time cost of the reduce phase of each reduce task, that is writing
> data
> > into HDFS, is different from each other. Since the input data and
> operations
> > of each reduce task is the same, what reason will cause the execution
> time
> > different? Is there anything wrong of my understanding? Does anybody have
> > any experience on this? Badly need your help, thanks.
> >
> > Best Regards,
> > Carp
> >
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Re: Performance tuning of sort

Posted by Jeff Zhang <zj...@gmail.com>.
Your understanding of Sort is not right. The key concept of Sort is
the TotalOrderPartitioner. Actually before the map-reduce job, client
side will do sampling of input data to estimate the distribution of
input data. And the mapper do nothing, each reducer will fetch its
data according the TotalOrderPartitioner. The data in each reducer is
local sorted, and each reducer are sorted ( r0<r1<r2....), so the
overall result data is sorted.



On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <ca...@gmail.com> wrote:
> Hi all,
>
> I'm doing some tuning of the sort benchmark of hadoop. To be more specified,
> running test against the org.apache.hadoop.examples.Sort class. As looking
> through the source code, I think the map tasks take responsibility of
> sorting the input data, and the reduce tasks just merge the map outputs and
> write them into HDFS. But here I've got a question I couldn't understand:
> the time cost of the reduce phase of each reduce task, that is writing data
> into HDFS, is different from each other. Since the input data and operations
> of each reduce task is the same, what reason will cause the execution time
> different? Is there anything wrong of my understanding? Does anybody have
> any experience on this? Badly need your help, thanks.
>
> Best Regards,
> Carp
>



-- 
Best Regards

Jeff Zhang