You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by roman kolcun <ro...@gmail.com> on 2009/08/20 03:55:14 UTC

File Chunk to Map Thread Association

Hello everyone,
could anyone please tell me in which class and which method does Hadoop
download the file chunk from HDFS and associate it with the thread that
executes the Map function on given chunk and process it?
I would like to extend the Hadoop so one "Task" may have more chunks
associated and one Map thread will process these two (or more chunks)
sequentially as if it was just a single file.

Thank you in advance.

Your Sincerely,
Roman

Re: File Chunk to Map Thread Association

Posted by roman kolcun <ro...@gmail.com>.
Thanks Tom,
I will have a look at it.

Cheers,
Roman

On Thu, Aug 20, 2009 at 3:02 PM, Tom White <to...@cloudera.com> wrote:

> Hi Roman,
>
> Have a look at CombineFileInputFormat - it might be related to what
> you are trying to do.
>
> Cheers,
> Tom
>
> On Thu, Aug 20, 2009 at 10:59 AM, roman kolcun<ro...@gmail.com>
> wrote:
> > On Thu, Aug 20, 2009 at 10:30 AM, Harish Mallipeddi <
> > harish.mallipeddi@gmail.com> wrote:
> >
> >> On Thu, Aug 20, 2009 at 2:39 PM, roman kolcun <ro...@gmail.com>
> >> wrote:
> >>
> >> >
> >> > Hello Harish,
> >> >
> >> > I know that TaskTracker creates separate threads (up to
> >> > mapred.tasktracker.map.tasks.maximum) which execute the map()
> function.
> >> > However, I haven't found the piece of code which associate FileSplit
> with
> >> > the given map thread. Is it downloaded locally in the TaskTracker
> >> function
> >> > or in MapTask?
> >> >
> >> >
> >> >
> >> Yes this is done by the MapTask.
> >
> >
> > Thanks, I will have a better look into it.
> >
> >>
> >>
> >> >
> >> > I know I can increase the input file size by changing
> >> > 'mapred.min.split.size' , however, the file is split sequentially and
> >> very
> >> > rarely two consecutive HDFS blocks are stored on a single node. This
> >> means
> >> > that the data locality will not be exploited cause every map() will
> have
> >> to
> >> > download part of the file from network.
> >> >
> >> > Roman Kolcun
> >> >
> >>
> >> I see what you mean - you want to modify the hadoop code to allocate
> >> multiple (non-sequential) data-local blocks to one MapTask.
> >
> >
> > That's exactly what I want to do.
> >
> >
> >> I don't know if you'll achieve much by doing all that work.
> >
> >
> > Basically I would like to emulate larger DFS blocksize. I've performed 2
> > word count benchmarks on a cluster of 10 machines with 100GB file. With
> 64MB
> > blocksize it took 2035 seconds, when I've increased it to 256MB it took
> 1694
> > seconds - which is 16.76% increase.
> >
> >
> >> Hadoop lets you reuse the
> >> launched JVMs for multiple MapTasks. That should minimize the overhead
> of
> >> launching MapTasks.
> >> Increasing the DFS blocksize for the input files is another means to
> >> achieve
> >> the same effect.
> >>
> >> Do you think that this could be eliminated by reusing JVMs?
> > I am doing it as a project for my university degree so I really hope it
> will
> > lower the processing time significantly. I would like to make it general
> for
> > different block sizes.
> >
> > Thank you for your help.
> >
> > Roman Kolcun
> >
>

Re: File Chunk to Map Thread Association

Posted by Tom White <to...@cloudera.com>.
Hi Roman,

Have a look at CombineFileInputFormat - it might be related to what
you are trying to do.

Cheers,
Tom

On Thu, Aug 20, 2009 at 10:59 AM, roman kolcun<ro...@gmail.com> wrote:
> On Thu, Aug 20, 2009 at 10:30 AM, Harish Mallipeddi <
> harish.mallipeddi@gmail.com> wrote:
>
>> On Thu, Aug 20, 2009 at 2:39 PM, roman kolcun <ro...@gmail.com>
>> wrote:
>>
>> >
>> > Hello Harish,
>> >
>> > I know that TaskTracker creates separate threads (up to
>> > mapred.tasktracker.map.tasks.maximum) which execute the map() function.
>> > However, I haven't found the piece of code which associate FileSplit with
>> > the given map thread. Is it downloaded locally in the TaskTracker
>> function
>> > or in MapTask?
>> >
>> >
>> >
>> Yes this is done by the MapTask.
>
>
> Thanks, I will have a better look into it.
>
>>
>>
>> >
>> > I know I can increase the input file size by changing
>> > 'mapred.min.split.size' , however, the file is split sequentially and
>> very
>> > rarely two consecutive HDFS blocks are stored on a single node. This
>> means
>> > that the data locality will not be exploited cause every map() will have
>> to
>> > download part of the file from network.
>> >
>> > Roman Kolcun
>> >
>>
>> I see what you mean - you want to modify the hadoop code to allocate
>> multiple (non-sequential) data-local blocks to one MapTask.
>
>
> That's exactly what I want to do.
>
>
>> I don't know if you'll achieve much by doing all that work.
>
>
> Basically I would like to emulate larger DFS blocksize. I've performed 2
> word count benchmarks on a cluster of 10 machines with 100GB file. With 64MB
> blocksize it took 2035 seconds, when I've increased it to 256MB it took 1694
> seconds - which is 16.76% increase.
>
>
>> Hadoop lets you reuse the
>> launched JVMs for multiple MapTasks. That should minimize the overhead of
>> launching MapTasks.
>> Increasing the DFS blocksize for the input files is another means to
>> achieve
>> the same effect.
>>
>> Do you think that this could be eliminated by reusing JVMs?
> I am doing it as a project for my university degree so I really hope it will
> lower the processing time significantly. I would like to make it general for
> different block sizes.
>
> Thank you for your help.
>
> Roman Kolcun
>

Re: File Chunk to Map Thread Association

Posted by roman kolcun <ro...@gmail.com>.
On Thu, Aug 20, 2009 at 10:30 AM, Harish Mallipeddi <
harish.mallipeddi@gmail.com> wrote:

> On Thu, Aug 20, 2009 at 2:39 PM, roman kolcun <ro...@gmail.com>
> wrote:
>
> >
> > Hello Harish,
> >
> > I know that TaskTracker creates separate threads (up to
> > mapred.tasktracker.map.tasks.maximum) which execute the map() function.
> > However, I haven't found the piece of code which associate FileSplit with
> > the given map thread. Is it downloaded locally in the TaskTracker
> function
> > or in MapTask?
> >
> >
> >
> Yes this is done by the MapTask.


Thanks, I will have a better look into it.

>
>
> >
> > I know I can increase the input file size by changing
> > 'mapred.min.split.size' , however, the file is split sequentially and
> very
> > rarely two consecutive HDFS blocks are stored on a single node. This
> means
> > that the data locality will not be exploited cause every map() will have
> to
> > download part of the file from network.
> >
> > Roman Kolcun
> >
>
> I see what you mean - you want to modify the hadoop code to allocate
> multiple (non-sequential) data-local blocks to one MapTask.


That's exactly what I want to do.


> I don't know if you'll achieve much by doing all that work.


Basically I would like to emulate larger DFS blocksize. I've performed 2
word count benchmarks on a cluster of 10 machines with 100GB file. With 64MB
blocksize it took 2035 seconds, when I've increased it to 256MB it took 1694
seconds - which is 16.76% increase.


> Hadoop lets you reuse the
> launched JVMs for multiple MapTasks. That should minimize the overhead of
> launching MapTasks.
> Increasing the DFS blocksize for the input files is another means to
> achieve
> the same effect.
>
> Do you think that this could be eliminated by reusing JVMs?
I am doing it as a project for my university degree so I really hope it will
lower the processing time significantly. I would like to make it general for
different block sizes.

Thank you for your help.

Roman Kolcun

Re: File Chunk to Map Thread Association

Posted by roman kolcun <ro...@gmail.com>.
Hello Ted,
I know that Hadoop tries to exploit data locality and it is pretty high.
However, the data locality cannot be exploited in case when
'mapred.min.split.size' is set to much higher than DFS blocksize - because
consecutive blocks are not stored on a single machine.
I have found out that there is a pretty high overhead when starting new map
task. When I set the DFS block size to 64MB one map task took on average 21
seconds. With DFS blocksize set to 256MB on average one map task took  63
seconds. If we consider that the blocksize is 4 times larger than processing
256 MB with 64MB chunks takes 84 seconds which is 21 seconds longer than
processing it with 256MB chunks.
Therefore I would like to merge several smaller local chunks into a larger
one and process it with one mapper.

Roman Kolcun

On Thu, Aug 20, 2009 at 6:36 PM, Ted Dunning <te...@gmail.com> wrote:

> Uhh.... hadoop already goes to considerable lengths to make sure that
> computation is local.  In my experience it is common for 90% of the map
> invocations to be working from local data.  Hadoop doesn't know about
> record
> boundaries so a little bit of slop into a non-local block is possible to
> finish reading a record, but the locality achieved is already quite high.
>
> Are you sure that what you are trying to fix is actually broken?
>
> On Thu, Aug 20, 2009 at 2:30 AM, Harish Mallipeddi <
> harish.mallipeddi@gmail.com> wrote:
>
> > >
> > > I know I can increase the input file size by changing
> > > 'mapred.min.split.size' , however, the file is split sequentially and
> > very
> > > rarely two consecutive HDFS blocks are stored on a single node. This
> > means
> > > that the data locality will not be exploited cause every map() will
> have
> > to
> > > download part of the file from network.
> > >
> > > Roman Kolcun
> > >
> >
> > I see what you mean - you want to modify the hadoop code to allocate
> > multiple (non-sequential) data-local blocks to one MapTask. I don't know
> if
> > you'll achieve much by doing all that work. Hadoop lets you reuse the
> > launched JVMs for multiple MapTasks. That should minimize the overhead of
> > launching MapTasks.
> > Increasing the DFS blocksize for the input files is another means to
> > achieve
> > the same effect.
> >
>
>
>
> --
> Ted Dunning, CTO
> DeepDyve
>

Re: File Chunk to Map Thread Association

Posted by Ted Dunning <te...@gmail.com>.
Uhh.... hadoop already goes to considerable lengths to make sure that
computation is local.  In my experience it is common for 90% of the map
invocations to be working from local data.  Hadoop doesn't know about record
boundaries so a little bit of slop into a non-local block is possible to
finish reading a record, but the locality achieved is already quite high.

Are you sure that what you are trying to fix is actually broken?

On Thu, Aug 20, 2009 at 2:30 AM, Harish Mallipeddi <
harish.mallipeddi@gmail.com> wrote:

> >
> > I know I can increase the input file size by changing
> > 'mapred.min.split.size' , however, the file is split sequentially and
> very
> > rarely two consecutive HDFS blocks are stored on a single node. This
> means
> > that the data locality will not be exploited cause every map() will have
> to
> > download part of the file from network.
> >
> > Roman Kolcun
> >
>
> I see what you mean - you want to modify the hadoop code to allocate
> multiple (non-sequential) data-local blocks to one MapTask. I don't know if
> you'll achieve much by doing all that work. Hadoop lets you reuse the
> launched JVMs for multiple MapTasks. That should minimize the overhead of
> launching MapTasks.
> Increasing the DFS blocksize for the input files is another means to
> achieve
> the same effect.
>



-- 
Ted Dunning, CTO
DeepDyve

Re: File Chunk to Map Thread Association

Posted by Harish Mallipeddi <ha...@gmail.com>.
On Thu, Aug 20, 2009 at 2:39 PM, roman kolcun <ro...@gmail.com> wrote:

>
> Hello Harish,
>
> I know that TaskTracker creates separate threads (up to
> mapred.tasktracker.map.tasks.maximum) which execute the map() function.
> However, I haven't found the piece of code which associate FileSplit with
> the given map thread. Is it downloaded locally in the TaskTracker function
> or in MapTask?
>
>
>
Yes this is done by the MapTask.


>
> I know I can increase the input file size by changing
> 'mapred.min.split.size' , however, the file is split sequentially and very
> rarely two consecutive HDFS blocks are stored on a single node. This means
> that the data locality will not be exploited cause every map() will have to
> download part of the file from network.
>
> Roman Kolcun
>

I see what you mean - you want to modify the hadoop code to allocate
multiple (non-sequential) data-local blocks to one MapTask. I don't know if
you'll achieve much by doing all that work. Hadoop lets you reuse the
launched JVMs for multiple MapTasks. That should minimize the overhead of
launching MapTasks.
Increasing the DFS blocksize for the input files is another means to achieve
the same effect.


-- 
Harish Mallipeddi
http://blog.poundbang.in

Re: File Chunk to Map Thread Association

Posted by roman kolcun <ro...@gmail.com>.
On Thu, Aug 20, 2009 at 6:49 AM, Harish Mallipeddi <
harish.mallipeddi@gmail.com> wrote:

> On Thu, Aug 20, 2009 at 7:25 AM, roman kolcun <ro...@gmail.com>
> wrote:
>
> > Hello everyone,
> > could anyone please tell me in which class and which method does Hadoop
> > download the file chunk from HDFS and associate it with the thread that
> > executes the Map function on given chunk and process it?
> >
>
> Hi Roman,
>
> First of all the map() function is executed in a MapTask (a separate child
> JVM is spawned by the TaskTracker). So it doesn't run inside a thread in
> the
> TaskTracker process.


Hello Harish,

I know that TaskTracker creates separate threads (up to
mapred.tasktracker.map.tasks.maximum) which execute the map() function.
However, I haven't found the piece of code which associate FileSplit with
the given map thread. Is it downloaded locally in the TaskTracker function
or in MapTask?


> >
> > I would like to extend the Hadoop so one "Task" may have more chunks
> > associated and one Map thread will process these two (or more chunks)
> > sequentially as if it was just a single file.
>
>
> You don't really have to change anything inside Hadoop to increase the size
> of InputSplits that MapTasks consume. Just modify 'mapred.min.split.size'
> if
> you want the splits to be larger. You could also write your own custom
> InputFormat class.
>

I know I can increase the input file size by changing
'mapred.min.split.size' , however, the file is split sequentially and very
rarely two consecutive HDFS blocks are stored on a single node. This means
that the data locality will not be exploited cause every map() will have to
download part of the file from network.

Roman Kolcun

Re: File Chunk to Map Thread Association

Posted by Harish Mallipeddi <ha...@gmail.com>.
On Thu, Aug 20, 2009 at 7:25 AM, roman kolcun <ro...@gmail.com> wrote:

> Hello everyone,
> could anyone please tell me in which class and which method does Hadoop
> download the file chunk from HDFS and associate it with the thread that
> executes the Map function on given chunk and process it?
>

Hi Roman,

First of all the map() function is executed in a MapTask (a separate child
JVM is spawned by the TaskTracker). So it doesn't run inside a thread in the
TaskTracker process.


> I would like to extend the Hadoop so one "Task" may have more chunks
> associated and one Map thread will process these two (or more chunks)
> sequentially as if it was just a single file.
>

You don't really have to change anything inside Hadoop to increase the size
of InputSplits that MapTasks consume. Just modify 'mapred.min.split.size' if
you want the splits to be larger. You could also write your own custom
InputFormat class.

-- 
Harish Mallipeddi
http://blog.poundbang.in