You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Paul Jones (Jira)" <ji...@apache.org> on 2020/01/10 03:25:00 UTC

[jira] [Created] (SPARK-30477) More KeyValueGroupedDataset methods should be composable

Paul Jones created SPARK-30477:
----------------------------------

             Summary: More KeyValueGroupedDataset methods should be composable
                 Key: SPARK-30477
                 URL: https://issues.apache.org/jira/browse/SPARK-30477
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.4
            Reporter: Paul Jones


Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.


Setup

{code:scala}
def f: T => K
def g: U => K
def h: V => K
val ds1: Dataset[T] = ???
val ds2: Dataset[U] = ???
val ds3: Dataset[V] = ??? 
val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
{code}

Example one: Combining multiple CoGrouped Dataset. 

{code:scala}
// Current
kvDs1
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
  .groupByKey((x: X) => ???: K)
  .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)

// Wanted
trait KeyValueGroupedDataset[K, T] {
  def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
}

kvDs1
  .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
  .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
{code}

Example two: Combining a reduceGroups with a coGroup 
{code:scala}
// current
val newDs1: Dataset[X] = kvDs1
  .reduceGroups((l: T, r: T) => ???: T))
  .groupByKey {case (k, _) => k }.mapValues { case (_, v) => v }
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)

// wanted
trait KeyValueGroupedDataset[K, T] {
  def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
}

val newDs2: Dataset[X] = kvDs1
  .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
{code}

In both cases not only are the ergonomics better, Spark will better able to optimize the code. 

For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`. 

We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org