You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/02 09:12:15 UTC

[GitHub] [spark] EnricoMi commented on a diff in pull request #35899: [SPARK-38591][SQL] Add flatMapSortedGroups to KeyValueGroupedDataset

EnricoMi commented on code in PR #35899:
URL: https://github.com/apache/spark/pull/35899#discussion_r862699830


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -171,6 +171,86 @@ class KeyValueGroupedDataset[K, V] private[sql](
     flatMapGroups((key, data) => f.call(key, data.asJava).asScala)(encoder)
   }
 
+  /**
+   * (Scala-specific)
+   * Applies the given function to each group of data.  For each unique group, the function will
+   * be passed the group key and a sorted iterator that contains all of the elements in the group.
+   * The function can return an iterator containing elements of an arbitrary type which will be
+   * returned as a new [[Dataset]].
+   *
+   * This function does not support partial aggregation, and as a result requires shuffling all
+   * the data in the [[Dataset]]. If an application intends to perform an aggregation over each
+   * key, it is best to use the reduce function or an
+   * `org.apache.spark.sql.expressions#Aggregator`.
+   *
+   * Internally, the implementation will spill to disk if any given group is too large to fit into
+   * memory.  However, users must take care to avoid materializing the whole iterator for a group
+   * (for example, by calling `toList`) unless they are sure that this is possible given the memory
+   * constraints of their cluster.
+   *
+   * @since 3.4.0
+   */
+  def flatMapSortedGroups[S: Encoder, U : Encoder]

Review Comment:
   With `V => S` you can pick any columns of `V` you like:
   
   ```
   case class Value(id, seq, timestamp, value)
   val ds: Dataset[Value]
   ds.groupBy(v => v.id).flatMapSortedGroups(v => (v.seq, v.timestamp)) { (_, iter) => iter }
   ```
   
   The sort order can be added to the function so it can easily be given by the user:
   `(s: V => S, direction: SortDirection = Ascending)`
   
   The `Column`-variant of `flatMapSortedGroups` below also allows for any number of columns and sort direction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org