You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by John Lilley <jo...@redpoint.net> on 2013/07/02 01:07:34 UTC

RE: Assignment of data splits to mappers

Bertrand,

Ah yes, I can see the wisdom of smaller tasks in (1).  Given that, does MR attempt to assign multiple blocks per task when the #blocks >> #nodes?

Regarding (2) we can run a simple thought experiment.  It seems likely that every block will have one dangling record, requiring a small read of the next block.  Let's consider the disk seek overhead.  Assume 64MB blocks and 6ms disk seeks.  If our network transports 64MB/sec on average, and a disk seek is 6ms, we can expect a task to take 1.006 seconds instead of 1.000 seconds.  The read of the extra block may induce a secondary seek under load by interrupting an otherwise-sequential transfer on another task, but it still seems likely that the seek overhead is < 1% under these assumptions.  It would be nice to know the TCP connection setup/teardown overhead as well, assuming each datanode switch induces a connection.

Cheers,
John


From: Bertrand Dechoux [mailto:dechouxb@gmail.com]
Sent: Tuesday, June 18, 2013 3:54 PM
To: user@hadoop.apache.org
Subject: Re: Assignment of data splits to mappers

1) The tradeoff is between reducing the overhead of distributed computing and reducing the cost of failure.
Less tasks, less overhead but the cost of failure will be bigger, mainly because the distribution will be coarser. One of the reason was outlined before. A (failed) task is related to an input split. Even when there is a single remaining task, the job tracker can not split it into several smaller subtasks to reduce the overall latency. But if there is too much tasks, the startup of the JVM itself can be a significant overhead.
2) It is assumed to not be significant. I would be interested to see the numbers but I don't know any deep studies of the impact.
The biggest

Bertrand

On Fri, Jun 14, 2013 at 9:50 PM, John Lilley <jo...@redpoint.net>> wrote:
Bertrand,
Thanks for taking the time to explain this!
I understand your point about contiguous blocks; they just aren't likely to exist.  I am still curious about two things:

1)      The map-per-block strategy.  If we have a lot more blocks than containers, wouldn't there be some advantage to having fewer maps (which means fewer connections, less seeking etc)?  Of course, increasing the block size would lead to the same thing and contiguous data to boot, but one doesn't always know the total data size.

2)      The record-spanning-blocks issue.  I understand that under most file formats, records *will* span blocks.  But if it were simple to prevent them from spanning blocks, would that be of benefit?
john

From: Bertrand Dechoux [mailto:dechouxb@gmail.com<ma...@gmail.com>]
Sent: Thursday, June 13, 2013 3:37 PM
To: user@hadoop.apache.org<ma...@hadoop.apache.org>
Subject: Re: Assignment of data splits to mappers

The first question can be split (no pun intended) into two topics because there is actually two distinct steps. First, the InputFormat partitions the data source into InputSplits. Its implementation will determine the exact logic. Then the scheduler is responsible for ordering where/when the InputSplit should be processed. But it doesn't really deal with block itself. The InputSplit itself knows on which node the data would be local or not.
If there is no other choice, you (or more exactly the implementation) can choose to have several blocks per InputSplit. But of course, it open lots of issues. The default strategy is one block per InputSplit (and thus per map task because there is one map task per InputSplit). If you really need to put several blocks per InputSplit, the root cause might often be that the block size is not big enough. I think it is fair to assume that the 10000 block file your are referring to is not using a 512MB block size.

MultiFileInputFormat does make InputSplit with blocks that are unlikely to be on the same datanode. But that's a good decision in regard to the kind of data source it has to deal with. Anyway, two 'continuous' blocks are also very unlikely to be on the same datanode (and even less the same HDD, and even less really continuous). The only abstraction to tell whether record of data should be close one from the other is the block. That's why the idea is not really to optimize read of 'continuous' blocks on the same machine/HDD but to consider whether the block size is the right one.

HDFS and Hadoop MapReduce have been designed to work together but there is a clean abstraction between them. HDFS does not know about records and clients writing to HDFS (like MapReduce) do not often need to know the block boundaries explicitly. That's why the RecordReader provided by the InputSplit is responsible for interpreting the data into records. But of course, it has to know how to deal with records stored on the block boundary. It will happen. The advantage is that the record logic can not corrupt the storage and can be selected at read time. TextInputFormat, KeyValueTextInputFormat and NLineInputFormat have different strategies which is only possible due to this abstraction. And that's also why MapReduce can read/write to other kinds of 'datastorage', like HBase for example : because it is not tightly coupled with HDFS. But it does also bring drawbacks.
Regards
Bertrand


On Thu, Jun 13, 2013 at 7:57 PM, John Lilley <jo...@redpoint.net>> wrote:
When MR assigns data splits to map tasks, does it assign a set of non-contiguous blocks to one map?  The reason I ask is, thinking through the problem, if I were the MR scheduler I would attempt to hand a map task a bunch of blocks that all exist on the same datanode, and then schedule the map task on that node.  E.g. if I have an HDFS file with 10000 blocks and I want to create 1000 map tasks I'd like each map task to have 10 blocks, but those blocks are unlikely to be contiguous on a given datanode.

This is related to a question I had asked earlier, which is whether any benefit could be had by aligning data splits along block boundaries to avoid slopping reads of a block to the next block and requiring another datanode connection.  The answer I got was that the extra connection overhead wasn't important.  The reason I bring this up again is that comments in this discussion (https://issues.apache.org/jira/browse/HADOOP-3315) imply that doing an extra seek to the beginning of the file to read a magic number on open is a significant overhead, and this looks like a similar issue to me.

Thanks,
john




--
Bertrand Dechoux



--
Bertrand Dechoux