You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by John M Cieslewicz <jm...@us.ibm.com> on 2007/07/27 02:02:18 UTC

A Case for Stronger Partial Aggregation Semantics in Hadoop

A Case for Stronger Partial Aggregation Semantics in Hadoop

The current combiner implementation provides the ability to reduce the
amount of required network bandwidth, however, its semantics are weak. With
strong semantics for a combiner-like operation, greater performance
improvements would be possible.

As currently implemented, a single map call may create zero or more output
key-value pairs via a collector. A reducer call, receives as input a key
and a list of values, and must output zero or more key-value pairs via a
collector. If defined, a combiner receives as input a key and list of
values with that key from a map output spill. The idea is that the combiner
reduces all of these values for the particular key to one output key-value
pair, thus reducing the total amount of data that must be transferred to a
reducer. The combiner semantics, however, are the same as the reducer’s and
there is nothing to prevent a programmer from implementing a combiner that
changes the value of the key or outputs more or less than one key-value
pair.

This leads to a number of limitations, chief among them the fact that the
combiner cannot be applied more than once because there are no guarantees
regarding the effects of repeatedly using the combiner (as implemented, the
combiner could produce more than one output pair or change the key).

A summary of desirable semantics:
   1 The map function produces as output partial aggregate values
      representing singletons.
   2 A new combiner function that explicitly performs partial to partial
      aggregation over one or more values, creating one new output value of
      the same type as the input value and not changing the key.
   3 A reducer which takes as input partial aggregates and produces final
      values of any format.

This is very similar to what a database requires of user defined
aggregates. For example, in Informix (
http://publib.boulder.ibm.com/infocenter/idshelp/v10/index.jsp?topic=/com.ibm.udr.doc/udr126.htm.
) the user must specify four functions: ‘init’, ‘iter’, ‘combine’, and
‘final’. In Hadoop, ‘init’ and ‘iter’ are already performed by the map. The
‘combine’ function would be implemented by the new combiner function.
Finally, reduce performs the ‘final’ aggregation.

This proposal requires a slightly more restrictive combiner, but with the
ability to apply this new combiner function repeatedly, one can obtain some
benefits, including:
   1 Rather than just combining within a mapper’s output spill, one could
      repeat the process during the merge of spills, further reducing the
      amount of data to be transferred.
   2 The reducer can be more aggressively pipelined with partial
      aggregation occurring among the finished map outputs while the
      reducer waits for later map tasks to complete. In this manner, some
      of the aggregation can be pushed into the sort and merge phases.

A potential contract with the user for a more explicit partial to partial
aggregate function would simply do away with the collector interface,
pushing any writing logic to the calling function:
Writable partialToPartial(Iterator values, Reporter reporter){… }
The user must return one and only one value. The key could also be provided
as a read only parameter.

As with the current combiner, a user need not provide a partial-to-partial
aggregation function. It is only an optimization.

I am currently working to get some performance numbers related to pushing
aggregation into the copying and merging phase of reduce. I welcome any
thoughts regarding this idea.

John Cieslewicz

Re: A Case for Stronger Partial Aggregation Semantics in Hadoop

Posted by John M Cieslewicz <jm...@us.ibm.com>.
"Owen O'Malley" <oo...@yahoo-inc.com> wrote on 07/30/2007 02:31:04 PM:

>
> On Jul 26, 2007, at 5:02 PM, John M Cieslewicz wrote:
> > The combiner semantics, however, are the same as the reducer’s and
> > there is nothing to prevent a programmer from implementing a
> > combiner that
> > changes the value of the key or outputs more or less than one key-
> > value
> > pair.
>
> The combiner and reducer share an interface. However, the semantics
> are different. In particular,
>    1. Combiners may be invoked once or many times on each of the map
> outputs, while reduces will be invoked exactly once on each key.
>    2. As a result of that, combiners effectively can not have side
> effects, while reduces can.
>    3. Reduces can emit different types than their inputs, combiners
> can not.
>    4. Reduces can change the key, while combiners are required not
> to. Currently this is not checked dynamically, although it should be.
> (Things will break badly if combiners do this...)

Rather than checking this dynamically, I think it would be easier and
clearer for the combiner programmer to define the combiner interface along
the lines suggested by Doug:
public interface Combiner {
   /** Combine all values passed into a single value that is returned. */
   public Writable combine(WritableComparable key, Iterator values);
}:
If such a change seems reasonable, I would be happy to implement the
necessary changes.

>
> Note that currently Hadoop invokes the combiner exactly once. There
> is a jira issue filed to fix that. *smile*
>

Could you point me to the jira issue related to this? I have already
implemented some things that are potentially related to this such as
combining across map spills during the merge at a completion of a map task.

> > This leads to a number of limitations, chief among them the fact
> > that the
> > combiner cannot be applied more than once because there are no
> > guarantees
> > regarding the effects of repeatedly using the combiner (as
> > implemented, the
> > combiner could produce more than one output pair or change the key).
>
> As I said in the previous point, the combiner can be invoked more
> than once and should be. It currently does not. Applications are
> required to keep the combiners pure. I hope it does not break too
> many applications when we fix this.
>
> > A summary of desirable semantics:
> >    1 The map function produces as output partial aggregate values
> >       representing singletons.
> >    2 A new combiner function that explicitly performs partial to
> > partial
> >       aggregation over one or more values, creating one new output
> > value of
> >       the same type as the input value and not changing the key.
> >    3 A reducer which takes as input partial aggregates and produces
> > final
> >       values of any format.
>
> Basically, we already have this, except that we allow the combiner to
> emit multiple records. Multiple records out of the combiner is not as
> clean, but in practice I don't think it hurts anything.
>
A potential problem caused by allowing a combiner to output multiple
records is that it could break future optimizations. With a combiner
defined, one could, for instance, pipeline some combining within the
reducer using a means other than sorting such as a tree or hash table. In
those cases, one might require a combiner to produce a single new value for
the given key.

-John

Re: A Case for Stronger Partial Aggregation Semantics in Hadoop

Posted by Owen O'Malley <oo...@yahoo-inc.com>.
On Jul 26, 2007, at 5:02 PM, John M Cieslewicz wrote:
> The combiner semantics, however, are the same as the reducer’s and
> there is nothing to prevent a programmer from implementing a  
> combiner that
> changes the value of the key or outputs more or less than one key- 
> value
> pair.

The combiner and reducer share an interface. However, the semantics  
are different. In particular,
   1. Combiners may be invoked once or many times on each of the map  
outputs, while reduces will be invoked exactly once on each key.
   2. As a result of that, combiners effectively can not have side  
effects, while reduces can.
   3. Reduces can emit different types than their inputs, combiners  
can not.
   4. Reduces can change the key, while combiners are required not  
to. Currently this is not checked dynamically, although it should be.  
(Things will break badly if combiners do this...)

Note that currently Hadoop invokes the combiner exactly once. There  
is a jira issue filed to fix that. *smile*

> This leads to a number of limitations, chief among them the fact  
> that the
> combiner cannot be applied more than once because there are no  
> guarantees
> regarding the effects of repeatedly using the combiner (as  
> implemented, the
> combiner could produce more than one output pair or change the key).

As I said in the previous point, the combiner can be invoked more  
than once and should be. It currently does not. Applications are  
required to keep the combiners pure. I hope it does not break too  
many applications when we fix this.

> A summary of desirable semantics:
>    1 The map function produces as output partial aggregate values
>       representing singletons.
>    2 A new combiner function that explicitly performs partial to  
> partial
>       aggregation over one or more values, creating one new output  
> value of
>       the same type as the input value and not changing the key.
>    3 A reducer which takes as input partial aggregates and produces  
> final
>       values of any format.

Basically, we already have this, except that we allow the combiner to  
emit multiple records. Multiple records out of the combiner is not as  
clean, but in practice I don't think it hurts anything.

> This proposal requires a slightly more restrictive combiner, but  
> with the
> ability to apply this new combiner function repeatedly, one can  
> obtain some
> benefits, including:
>    1 Rather than just combining within a mapper’s output spill, one  
> could
>       repeat the process during the merge of spills, further  
> reducing the
>       amount of data to be transferred.
>    2 The reducer can be more aggressively pipelined with partial
>       aggregation occurring among the finished map outputs while the
>       reducer waits for later map tasks to complete. In this  
> manner, some
>       of the aggregation can be pushed into the sort and merge phases.

You are right that combiners on the reduce side also likely make  
sense on the output of the merge. The payback is less because the  
data isn't likely to be large, but for some applications, it may be  
significant.

Do note however, that the combiners are not free. They force the  
objects to be serialized and deserialized an extra time and their own  
execution time. In general if the user has asked for them, they will  
reduce the data, but not always.

-- Owen



Re: A Case for Stronger Partial Aggregation Semantics in Hadoop

Posted by Doug Cutting <cu...@apache.org>.
John M Cieslewicz wrote:
> A summary of desirable semantics:
>    1 The map function produces as output partial aggregate values
>       representing singletons.
>    2 A new combiner function that explicitly performs partial to partial
>       aggregation over one or more values, creating one new output value of
>       the same type as the input value and not changing the key.
>    3 A reducer which takes as input partial aggregates and produces final
>       values of any format.

This seems consistent with all uses of combiners I have seen.  It seems 
we could start by simply adding a few tests:

1. That combiners do not alter keys;
2. That combiners always output a single value;

Then, subsequently we can add optimizations that take advantage of this. 
  If we decide to go this direction, it might be worth adding these 
tests sooner rather than later.

Alternately, we could add a new Combiner interface that enforces these:

public interface Combiner {
   /** Combine all values passed into a single value that is returned. */
   public Writable combine(WritableComparable key, Iterator values);
}

We should also make it clear to programmers that a values may be 
combined more than once between map and reduce.  Besides documentation, 
a good way to make folks aware of that is to implement it.

> I am currently working to get some performance numbers related to pushing
> aggregation into the copying and merging phase of reduce.

Please share your results when you have them!

Thanks,

Doug