You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by yurgis <ur...@gmail.com> on 2011/04/25 21:55:43 UTC

Map-only jobs and partitioning

I have a use case where I join two equi-partitioned data sets (old and
new) to produce a merged set. In theory, this is trivially solvable by
map-side join using CompositeInputFormat. No shuffle, sort, reducer is
needed.

The mapper output files are expected to be named exactly as the input
(with the same index suffix) to keep them partitioned exactly as the
input data set. (As it may be used in subsequent merge jobs later)

However, this is not a case. Generally, mapper's task id does NOT
match input file partition (for example, input files named as
part-r-00000 may be mapped by mapper with id #2. As a result, the
mapper output will be named as part-m-00002).

That means that partitioning of the merged data set is broken and it
can no longer be used for subsequent merge operations.

I traced the issue down to this place in JobClient.java:

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new NewSplitComparator());

It is  because of this sort, input files are no longer ordered as
originally done by the corresponding InputFormat.

I believe it is a major issue that the M/R framework does not preserve
input file sequence when assigning Task ids to mappers.
This would allow map-side joins (or generally any map-only jobs that
have splits disabled) to keep the output partitioned exactly the same
as the input.

The split file should preserve the original split order (per
InputFormat) but mappers can still be executed in the order of split
size.  The sort by split size can be move to a later stage, for
example it can be done right before mapper execution.