You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Paul Jones (Jira)" <ji...@apache.org> on 2020/01/10 05:00:00 UTC
[jira] [Updated] (SPARK-30477) More KeyValueGroupedDataset methods
should be composable
[ https://issues.apache.org/jira/browse/SPARK-30477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Paul Jones updated SPARK-30477:
-------------------------------
Description:
Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.
Setup
{code:scala}
def f: T => K
def g: U => K
def h: V => K
val ds1: Dataset[T] = ???
val ds2: Dataset[U] = ???
val ds3: Dataset[V] = ???
val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
{code}
Example one: Combining multiple CoGrouped Dataset.
{code:scala}
// Current
kvDs1
.coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
.groupByKey((x: X) => ???: K)
.coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
// Wanted
trait KeyValueGroupedDataset[K, T] {
def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
}
kvDs1
.coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
.coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
{code}
Example two: Combining a reduceGroups with a coGroup
{code:scala}
// current
val newDs1: Dataset[X] = kvDs1
.reduceGroups((l: T, r: T) => ???: T))
.groupByKey {case (k, _) => k }
.mapValues { case (_, v) => v }
.coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
// wanted
trait KeyValueGroupedDataset[K, T] {
def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
}
val newDs2: Dataset[X] = kvDs1
.reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
.coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
{code}
In both cases not only are the ergonomics better, Spark will better able to optimize the code.
For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`.
We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`
was:
Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.
Setup
{code:scala}
def f: T => K
def g: U => K
def h: V => K
val ds1: Dataset[T] = ???
val ds2: Dataset[U] = ???
val ds3: Dataset[V] = ???
val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
{code}
Example one: Combining multiple CoGrouped Dataset.
{code:scala}
// Current
kvDs1
.coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
.groupByKey((x: X) => ???: K)
.coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
// Wanted
trait KeyValueGroupedDataset[K, T] {
def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
}
kvDs1
.coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
.coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
{code}
Example two: Combining a reduceGroups with a coGroup
{code:scala}
// current
val newDs1: Dataset[X] = kvDs1
.reduceGroups((l: T, r: T) => ???: T))
.groupByKey {case (k, _) => k }.mapValues { case (_, v) => v }
.coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
// wanted
trait KeyValueGroupedDataset[K, T] {
def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
}
val newDs2: Dataset[X] = kvDs1
.reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
.coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
{code}
In both cases not only are the ergonomics better, Spark will better able to optimize the code.
For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`.
We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`
> More KeyValueGroupedDataset methods should be composable
> --------------------------------------------------------
>
> Key: SPARK-30477
> URL: https://issues.apache.org/jira/browse/SPARK-30477
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.4
> Reporter: Paul Jones
> Priority: Major
>
> Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.
> Setup
> {code:scala}
> def f: T => K
> def g: U => K
> def h: V => K
> val ds1: Dataset[T] = ???
> val ds2: Dataset[U] = ???
> val ds3: Dataset[V] = ???
> val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
> val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
> val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
> {code}
> Example one: Combining multiple CoGrouped Dataset.
> {code:scala}
> // Current
> kvDs1
> .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
> .groupByKey((x: X) => ???: K)
> .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
> // Wanted
> trait KeyValueGroupedDataset[K, T] {
> def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
> }
> kvDs1
> .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
> .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
> {code}
> Example two: Combining a reduceGroups with a coGroup
> {code:scala}
> // current
> val newDs1: Dataset[X] = kvDs1
> .reduceGroups((l: T, r: T) => ???: T))
> .groupByKey {case (k, _) => k }
> .mapValues { case (_, v) => v }
> .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
> // wanted
> trait KeyValueGroupedDataset[K, T] {
> def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
> }
> val newDs2: Dataset[X] = kvDs1
> .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
> .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
> {code}
> In both cases not only are the ergonomics better, Spark will better able to optimize the code.
> For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`.
> We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org