You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by roman kolcun <ro...@gmail.com> on 2009/07/15 04:36:36 UTC

Hadoop execution improvement in a heterogeneous environment

Hello everyone,
I've got an idea of how to improve the execution time of map phase in a
heterogeneous environment (when other processes may run on the machines
rendering it slower than other machines). Currently map phase process data
in larger chunks (usually 64MB). At the end of map phase data assigned to
slower nodes (strugglers) are reassigned to faster nodes and map phase
finishes whenever one of these tasks finishes sooner. The other one is
killed.

Lets assume there are two TaskTrackers each processing 128MB. One of these
two is significantly slower. After first node process all 128MB (2 x 64MB)
the second node has processed let's say only 80MB (1 x 64MB + 16MB).
Currently the first node will be assigned to the second chunk from the
second node (64MB) and will finish it probably faster then the second node.
The second task on second node would be killed. When the first node finishes
the reassigned data the second node will process approx. 100MB which means
that 36MB were processed redundantly.

My idea is that on each node there will be a special DataAssign thread which
will take care of assigning data to each map thread. This thread will
communicate with JobTracker and inform it about the progress. It will also
split data into smaller chunks (i.e. 32MB) and assign these smaller chunks
to map thread upon request. In the scenario above, the JobTracker will
realise that the second node is slower so it will notify the DataAssign
thread that it should not assign last 32MB to its node, but this last
smaller chunk (only 32MB) will be assigned to the first node. At the end of
the day, no data will be processed twice and overall execution should be
shorter.

In addition to this I would like to change the implementation of the
JobTracker so it will assign whole input file(s) to all nodes at the
beginning taking into account the data locality. After that the data may be
rebalanced depending on the info retrieved from the DataAssign thread.

Do you think it is a good idea? Do you think it could work? Has anyone been
(or is) working on this or similar feature?

Thank you for all replies and suggestions.

Yours Sincerely,
Roman

Re: Hadoop execution improvement in a heterogeneous environment

Posted by roman kolcun <ro...@gmail.com>.
Thank you for your reply.

See my responses inline.

On Thu, Jul 16, 2009 at 4:23 AM, Jothi Padmanabhan <jo...@yahoo-inc.com>wrote:

> See some responses inline.
>
> > My idea is that on each node there will be a special DataAssign thread
> which
> > will take care of assigning data to each map thread. This thread will
> > communicate with JobTracker and inform it about the progress. It will
> also
> > split data into smaller chunks (i.e. 32MB) and assign these smaller
> chunks
> > to map thread upon request. In the scenario above, the JobTracker will
> > realise that the second node is slower so it will notify the DataAssign
> > thread that it should not assign last 32MB to its node, but this last
> > smaller chunk (only 32MB) will be assigned to the first node. At the end
> of
> > the day, no data will be processed twice and overall execution should be
> > shorter.
>
> Could you not achieve this by just setting the block size to 32MB so that
> each map handles a smaller chunk?


If the chunks were smaller we would have higher communication overhead. In
addition some time will be wasted on Mapper object initialisation because it
is initialised for each chunk.
Another disadvantage would be that in some case there would be more mappers
performing speculative execution than needed. These mappers could have been
used as reduceres.


> >
> > In addition to this I would like to change the implementation of the
> > JobTracker so it will assign whole input file(s) to all nodes at the
> > beginning taking into account the data locality. After that the data may
> be
> > rebalanced depending on the info retrieved from the DataAssign thread.
>
> What do you mean by "assign whole files". Files typically reside in DFS and
> are accessible from all nodes. And files are made of blocks, so locality
> actually is at a block level and not at the file level.


I mean that the whole file will be  partitioned and assigned to all
available TaskTrackers at the beginning. Currently the file is split into
equal chunks and are assigned to the TaskTrackers upon their request - one
chunk per request. If the chunks are too small this might create high
communication overhead. In addition, lets say we have to process 2 chunks
which are stored on node 1 & 2 and there are 3 task trackers.

Lets say that first node #3 will ask for a chunk, the JobTracker will assign
chunk from node #1 to it, so it will be sent over the network. Then node #1
will ask for a chunk and it will be assigned to chunk on node #2 - so again
it will be sent over the network. The node #2 will not do anything. So
instead of assigning nodes to local resources all chunks will be sent over
the network.

If we knew which task trackers are available and where files reside (both
info are available) we can perform much better file association.

Also if a node stores more data it would be better to assign several local
chunks at once so we avoid communication overhead. We will also save time
with initialisation because Mapper object is created for each input chunk
and if the chunks are bigger we can save several object initialisations.


> Cheers
> Jothi
>
>
Cheers,
Roman

Re: Hadoop execution improvement in a heterogeneous environment

Posted by Jothi Padmanabhan <jo...@yahoo-inc.com>.
See some responses inline.

> My idea is that on each node there will be a special DataAssign thread which
> will take care of assigning data to each map thread. This thread will
> communicate with JobTracker and inform it about the progress. It will also
> split data into smaller chunks (i.e. 32MB) and assign these smaller chunks
> to map thread upon request. In the scenario above, the JobTracker will
> realise that the second node is slower so it will notify the DataAssign
> thread that it should not assign last 32MB to its node, but this last
> smaller chunk (only 32MB) will be assigned to the first node. At the end of
> the day, no data will be processed twice and overall execution should be
> shorter.

Could you not achieve this by just setting the block size to 32MB so that
each map handles a smaller chunk?

> 
> In addition to this I would like to change the implementation of the
> JobTracker so it will assign whole input file(s) to all nodes at the
> beginning taking into account the data locality. After that the data may be
> rebalanced depending on the info retrieved from the DataAssign thread.

What do you mean by "assign whole files". Files typically reside in DFS and
are accessible from all nodes. And files are made of blocks, so locality
actually is at a block level and not at the file level.


Cheers
Jothi


Re: Hadoop execution improvement in a heterogeneous environment

Posted by roman kolcun <ro...@gmail.com>.
Hello everyone,
I am not sure whether this fits into the "common user" mailinglist, but I
haven't received any reply in development mailinglist so I am trying it
here.

I've got an idea of how to improve the execution time of map phase in a
heterogeneous environment (when other processes may run on the machines
rendering it slower than other machines). Currently map phase process data
in larger chunks (usually 64MB). At the end of map phase data assigned to
slower nodes (strugglers) are reassigned to faster nodes and map phase
finishes whenever one of these tasks finishes sooner. The other one is
killed.

Lets assume there are two TaskTrackers each processing 128MB. One of these
two is significantly slower. After first node process all 128MB (2 x 64MB)
the second node has processed let's say only 80MB (1 x 64MB + 16MB).
Currently the first node will be assigned to the second chunk from the
second node (64MB) and will finish it probably faster then the second node.
The second task on second node would be killed. When the first node finishes
the reassigned data the second node will process approx. 100MB which means
that 36MB were processed redundantly.

My idea is that on each node there will be a special DataAssign thread which
will take care of assigning data to each map thread. This thread will
communicate with JobTracker and inform it about the progress. It will also
split data into smaller chunks (i.e. 32MB) and assign these smaller chunks
to map thread upon request. In the scenario above, the JobTracker will
realise that the second node is slower so it will notify the DataAssign
thread that it should not assign last 32MB to its node, but this last
smaller chunk (only 32MB) will be assigned to the first node. At the end of
the day, no data will be processed twice and overall execution should be
shorter.

In addition to this I would like to change the implementation of the
JobTracker so it will assign whole input file(s) to all nodes at the
beginning taking into account the data locality. After that the data may be
rebalanced depending on the info retrieved from the DataAssign thread.

Do you think it is a good idea? Do you think it could work? Has anyone been
(or is) working on this or similar feature?

Thank you for all replies and suggestions.

Yours Sincerely,
Roman