You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Paul Jones (Jira)" <ji...@apache.org> on 2020/01/10 05:00:00 UTC

[jira] [Updated] (SPARK-30477) More KeyValueGroupedDataset methods should be composable

     [ https://issues.apache.org/jira/browse/SPARK-30477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Paul Jones updated SPARK-30477:
-------------------------------
    Description: 
Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.


Setup

{code:scala}
def f: T => K
def g: U => K
def h: V => K
val ds1: Dataset[T] = ???
val ds2: Dataset[U] = ???
val ds3: Dataset[V] = ??? 
val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
{code}

Example one: Combining multiple CoGrouped Dataset. 

{code:scala}
// Current
kvDs1
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
  .groupByKey((x: X) => ???: K)
  .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)

// Wanted
trait KeyValueGroupedDataset[K, T] {
  def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
}

kvDs1
  .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
  .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
{code}

Example two: Combining a reduceGroups with a coGroup 
{code:scala}
// current
val newDs1: Dataset[X] = kvDs1
  .reduceGroups((l: T, r: T) => ???: T))
  .groupByKey {case (k, _) => k }
  .mapValues { case (_, v) => v }
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)

// wanted
trait KeyValueGroupedDataset[K, T] {
  def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
}

val newDs2: Dataset[X] = kvDs1
  .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
{code}

In both cases not only are the ergonomics better, Spark will better able to optimize the code. 

For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`. 

We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`

  was:
Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.


Setup

{code:scala}
def f: T => K
def g: U => K
def h: V => K
val ds1: Dataset[T] = ???
val ds2: Dataset[U] = ???
val ds3: Dataset[V] = ??? 
val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
{code}

Example one: Combining multiple CoGrouped Dataset. 

{code:scala}
// Current
kvDs1
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
  .groupByKey((x: X) => ???: K)
  .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)

// Wanted
trait KeyValueGroupedDataset[K, T] {
  def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
}

kvDs1
  .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
  .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
{code}

Example two: Combining a reduceGroups with a coGroup 
{code:scala}
// current
val newDs1: Dataset[X] = kvDs1
  .reduceGroups((l: T, r: T) => ???: T))
  .groupByKey {case (k, _) => k }.mapValues { case (_, v) => v }
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)

// wanted
trait KeyValueGroupedDataset[K, T] {
  def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
}

val newDs2: Dataset[X] = kvDs1
  .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
{code}

In both cases not only are the ergonomics better, Spark will better able to optimize the code. 

For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`. 

We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`


> More KeyValueGroupedDataset methods should be composable
> --------------------------------------------------------
>
>                 Key: SPARK-30477
>                 URL: https://issues.apache.org/jira/browse/SPARK-30477
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.4
>            Reporter: Paul Jones
>            Priority: Major
>
> Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.
> Setup
> {code:scala}
> def f: T => K
> def g: U => K
> def h: V => K
> val ds1: Dataset[T] = ???
> val ds2: Dataset[U] = ???
> val ds3: Dataset[V] = ??? 
> val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
> val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
> val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
> {code}
> Example one: Combining multiple CoGrouped Dataset. 
> {code:scala}
> // Current
> kvDs1
>   .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
>   .groupByKey((x: X) => ???: K)
>   .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
> // Wanted
> trait KeyValueGroupedDataset[K, T] {
>   def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
> }
> kvDs1
>   .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
>   .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
> {code}
> Example two: Combining a reduceGroups with a coGroup 
> {code:scala}
> // current
> val newDs1: Dataset[X] = kvDs1
>   .reduceGroups((l: T, r: T) => ???: T))
>   .groupByKey {case (k, _) => k }
>   .mapValues { case (_, v) => v }
>   .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
> // wanted
> trait KeyValueGroupedDataset[K, T] {
>   def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
> }
> val newDs2: Dataset[X] = kvDs1
>   .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
>   .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
> {code}
> In both cases not only are the ergonomics better, Spark will better able to optimize the code. 
> For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`. 
> We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org