You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Andres Perez <an...@tresata.com> on 2016/05/19 18:12:31 UTC

Dataset reduceByKey

Hi all,

We were in the process of porting an RDD program to one which uses
Datasets. Most things were easy to transition, but one hole in
functionality we found was the ability to reduce a Dataset by key,
something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding
the functionality ourselves involved creating a KeyValueGroupedDataset and
calling reduceGroups to get the reduced Dataset.

  class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) {
    def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2:
Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] =
      ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1,
func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) }
  }

Note that the functions passed into .reduceGroups takes in the key-value
pair. It'd be nicer to pass in a function that maps just the values, i.e.
reduceGroups(func). This would require the ability to modify the values of
the KeyValueGroupedDataset (which is returned by the .groupByKey call on a
Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V
=> U)) does not currently exist.

The more important issue, however, is the inefficiency of .reduceGroups.
The function does not support partial aggregation (reducing map-side), and
as a result requires shuffling all the data in the Dataset. A more
efficient alternative that that we explored involved creating a Dataset
from the KeyValueGroupedDataset by creating an Aggregator and passing it as
a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the
Aggregator necessitated the creation of a zero to create a valid monoid.
However, the zero is dependent on the reduce function. The zero for a
function such as addition on Ints would be different from the zero for
taking the minimum over Ints, for example. The Aggregator requires that we
not break the rule of reduce(a, zero) == a. To do this we had to create an
Aggregator with a buffer type that stores the value along with a null flag
(using Scala's nice Option syntax yielded some mysterious errors that I
haven't worked through yet, unfortunately), used by the zero element to
signal that it should not participate in the reduce function.

-Andy

Re: Dataset reduceByKey

Posted by Reynold Xin <rx...@databricks.com>.
Here's a ticket: https://issues.apache.org/jira/browse/SPARK-15598



On Fri, May 20, 2016 at 12:35 AM, Reynold Xin <rx...@databricks.com> wrote:

> Andres - this is great feedback. Let me think about it a little bit more
> and reply later.
>
>
> On Thu, May 19, 2016 at 11:12 AM, Andres Perez <an...@tresata.com> wrote:
>
>> Hi all,
>>
>> We were in the process of porting an RDD program to one which uses
>> Datasets. Most things were easy to transition, but one hole in
>> functionality we found was the ability to reduce a Dataset by key,
>> something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding
>> the functionality ourselves involved creating a KeyValueGroupedDataset and
>> calling reduceGroups to get the reduced Dataset.
>>
>>   class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) {
>>     def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2:
>> Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] =
>>       ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1,
>> func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) }
>>   }
>>
>> Note that the functions passed into .reduceGroups takes in the key-value
>> pair. It'd be nicer to pass in a function that maps just the values, i.e.
>> reduceGroups(func). This would require the ability to modify the values of
>> the KeyValueGroupedDataset (which is returned by the .groupByKey call on a
>> Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V
>> => U)) does not currently exist.
>>
>> The more important issue, however, is the inefficiency of .reduceGroups.
>> The function does not support partial aggregation (reducing map-side), and
>> as a result requires shuffling all the data in the Dataset. A more
>> efficient alternative that that we explored involved creating a Dataset
>> from the KeyValueGroupedDataset by creating an Aggregator and passing it as
>> a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the
>> Aggregator necessitated the creation of a zero to create a valid monoid.
>> However, the zero is dependent on the reduce function. The zero for a
>> function such as addition on Ints would be different from the zero for
>> taking the minimum over Ints, for example. The Aggregator requires that we
>> not break the rule of reduce(a, zero) == a. To do this we had to create an
>> Aggregator with a buffer type that stores the value along with a null flag
>> (using Scala's nice Option syntax yielded some mysterious errors that I
>> haven't worked through yet, unfortunately), used by the zero element to
>> signal that it should not participate in the reduce function.
>>
>> -Andy
>>
>
>

Re: Dataset reduceByKey

Posted by Reynold Xin <rx...@databricks.com>.
Andres - this is great feedback. Let me think about it a little bit more
and reply later.


On Thu, May 19, 2016 at 11:12 AM, Andres Perez <an...@tresata.com> wrote:

> Hi all,
>
> We were in the process of porting an RDD program to one which uses
> Datasets. Most things were easy to transition, but one hole in
> functionality we found was the ability to reduce a Dataset by key,
> something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding
> the functionality ourselves involved creating a KeyValueGroupedDataset and
> calling reduceGroups to get the reduced Dataset.
>
>   class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) {
>     def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2:
> Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] =
>       ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1,
> func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) }
>   }
>
> Note that the functions passed into .reduceGroups takes in the key-value
> pair. It'd be nicer to pass in a function that maps just the values, i.e.
> reduceGroups(func). This would require the ability to modify the values of
> the KeyValueGroupedDataset (which is returned by the .groupByKey call on a
> Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V
> => U)) does not currently exist.
>
> The more important issue, however, is the inefficiency of .reduceGroups.
> The function does not support partial aggregation (reducing map-side), and
> as a result requires shuffling all the data in the Dataset. A more
> efficient alternative that that we explored involved creating a Dataset
> from the KeyValueGroupedDataset by creating an Aggregator and passing it as
> a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the
> Aggregator necessitated the creation of a zero to create a valid monoid.
> However, the zero is dependent on the reduce function. The zero for a
> function such as addition on Ints would be different from the zero for
> taking the minimum over Ints, for example. The Aggregator requires that we
> not break the rule of reduce(a, zero) == a. To do this we had to create an
> Aggregator with a buffer type that stores the value along with a null flag
> (using Scala's nice Option syntax yielded some mysterious errors that I
> haven't worked through yet, unfortunately), used by the zero element to
> signal that it should not participate in the reduce function.
>
> -Andy
>