You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/17 07:58:35 UTC

[GitHub] [spark] EnricoMi opened a new pull request, #37551: [SPARK-38591][SQL] Add flatMapSortedGroups and cogroupSorted to KeyValueGroupedDataset

EnricoMi opened a new pull request, #37551:
URL: https://github.com/apache/spark/pull/37551

   ### What changes were proposed in this pull request?
   This adds sorted versions of `ds.groupByKey(…).flatMapGroups(…)` and `ds.groupByKey(…).cogroup(…)`.
   
   ### Why are the changes needed?
   The existing methods `flatMapGroups` and `cogroup` provide an iterator of rows for each group key. If user code requires those rows in a particular order, that iterator would have to be sorted first, which is against the idea of an iterator in the first place. Methods `flatMapGroups` and `cogroup` have the great advantage that they work with groups that are too large to fit into memory of one executor. Sorting them in the user function breaks this property.
   
   [org.apache.spark.sql.KeyValueGroupedDataset](https://github.com/apache/spark/blob/47485a3c2df3201c838b939e82d5b26332e2d858/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala#L134-L137):
   > Internally, the implementation will spill to disk if any given group is too large to fit into
   > memory.  However, users must take care to avoid materializing the whole iterator for a group
   > (for example, by calling `toList`) unless they are sure that this is possible given the memory
   > constraints of their cluster.
   
   The implementations of `KeyValueGroupedDataset.flatMapGroups` and `KeyValueGroupedDataset.cogroup` already sort each partition according to the group key. By additionally sorting by some data columns, the iterator can be guaranteed to provide some order.
   
   ### Does this PR introduce _any_ user-facing change?
   This adds `KeyValueGroupedDataset.flatMapSortedGroups` and `KeyValueGroupedDataset.cogroupSorted`.
   
   ### How was this patch tested?
   These tests have been added:
   - `DatasetSuite."groupBy function, flatMapSorted by func"`
   - `DatasetSuite."groupBy function, flatMapSorted by expr"`
   - `DatasetSuite."cogroup sorted"`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1386664690

   It doesn't sound ideal to me if we expect users to read the whole method doc and be warned by themselves. This gives far different user experience with `sortWithinPartitions`, while users would make a guess that `sortWithinGroups` is a pair API with it.
   
   If we really want to do this "generally", I'd say let's be consistent with sortWithinPartitions, add Sort logical node explicitly which performs sort with primary and secondary key. And the order requirement in flatMapGroups shouldn't trigger additional sort since the orderness of DataFrame is superset of the requirement.
   
   But if we just want to address this to only two methods, I'd say let's touch these methods instead of trying out odd generalization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
EnricoMi commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1398081395

   Closing as #39640 has been merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37551:
URL: https://github.com/apache/spark/pull/37551#discussion_r1073105645


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -119,13 +130,66 @@ class KeyValueGroupedDataset[K, V] private[sql](
         Project(groupingAttributes, logicalPlan)))
   }
 
+  /**
+   * Returns a new [[KeyValueGroupedDataset]] with each group sorted by the given expressions.
+   * Operations that provide an iterator that contains all of the elements in a group will
+   * then provide a sorted iterator (flatMapGroups, mapGroups, cogroup).
+   *
+   * This is not supported for streaming Datasets (mapGroupsWithState, flatMapGroupsWithState).
+   *
+   * @tparam S The type of the sort value. Must be encodable to Spark SQL types.
+   * @param sortBy A function that provides a sort value for each row.
+   * @param direction The sort direction.
+   *
+   * @since 3.4.0
+   */
+  def sortWithinGroups[S: Encoder](
+      sortBy: V => S, direction: SortDirection = Ascending): KeyValueGroupedDataset[K, V] = {

Review Comment:
   I feel it's tricky to define the sort order with a class `S`. Shall we respect its `compareTo` method? There is no `Dataset.sortBy` with a class either. Shall we remove this overload?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi closed pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
EnricoMi closed pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset
URL: https://github.com/apache/spark/pull/37551


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37551:
URL: https://github.com/apache/spark/pull/37551#discussion_r1073103770


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala:
##########
@@ -684,6 +692,8 @@ case class CoGroup(
     rightGroup: Seq[Attribute],
     leftAttr: Seq[Attribute],
     rightAttr: Seq[Attribute],
+    leftOrder: Option[Seq[SortOrder]],

Review Comment:
   can we simply use `Seq[SortOrder]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #37551: [SPARK-38591][SQL] Add flatMapSortedGroups and cogroupSorted to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1219169525

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #37551:
URL: https://github.com/apache/spark/pull/37551#discussion_r1073102409


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala:
##########
@@ -386,16 +386,18 @@ case class AppendColumnsWithObject(
 /** Factory for constructing new `MapGroups` nodes. */
 object MapGroups {
   def apply[K : Encoder, T : Encoder, U : Encoder](
-      func: (K, Iterator[T]) => TraversableOnce[U],
-      groupingAttributes: Seq[Attribute],
-      dataAttributes: Seq[Attribute],
-      child: LogicalPlan): LogicalPlan = {
+    func: (K, Iterator[T]) => TraversableOnce[U],
+    groupingAttributes: Seq[Attribute],
+    dataAttributes: Seq[Attribute],
+    dataOrder: Seq[SortOrder],
+    child: LogicalPlan): LogicalPlan = {

Review Comment:
   ```suggestion
         func: (K, Iterator[T]) => TraversableOnce[U],
         groupingAttributes: Seq[Attribute],
         dataAttributes: Seq[Attribute],
         dataOrder: Seq[SortOrder],
       child: LogicalPlan): LogicalPlan = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on a diff in pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
EnricoMi commented on code in PR #37551:
URL: https://github.com/apache/spark/pull/37551#discussion_r1073217390


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -119,13 +130,66 @@ class KeyValueGroupedDataset[K, V] private[sql](
         Project(groupingAttributes, logicalPlan)))
   }
 
+  /**
+   * Returns a new [[KeyValueGroupedDataset]] with each group sorted by the given expressions.
+   * Operations that provide an iterator that contains all of the elements in a group will
+   * then provide a sorted iterator (flatMapGroups, mapGroups, cogroup).
+   *
+   * This is not supported for streaming Datasets (mapGroupsWithState, flatMapGroupsWithState).
+   *
+   * @tparam S The type of the sort value. Must be encodable to Spark SQL types.
+   * @param sortBy A function that provides a sort value for each row.
+   * @param direction The sort direction.
+   *
+   * @since 3.4.0
+   */
+  def sortWithinGroups[S: Encoder](
+      sortBy: V => S, direction: SortDirection = Ascending): KeyValueGroupedDataset[K, V] = {

Review Comment:
   This was inspired by `groupByKey`. You are right, equality (`groupByKey`) and order are two different things. since there is no equivalent `Dataset.sortBy` method, I'll remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alasdairm-gr commented on pull request #37551: [SPARK-38591][SQL] Add flatMapSortedGroups and cogroupSorted to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
alasdairm-gr commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1218188619

   This would be massively useful to us.  Being able to iterate through groups in a sorted manner is a key use case for anyone who works with time series data.  It would be fantastic to have this supported in Spark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
EnricoMi commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1386641052

   The nice thing about this approach is that you take a sorted grouped dataframe and can use it with existing `cogroup` or `flatMapGroups` methods, without introducing new methods. Down-side is that you might confuse that sorting with aggregation functions. I thought, the documentation on `sortWithinGroups` the would be sufficient to make that clear:
   https://github.com/apache/spark/pull/37551/files#diff-3437bb4bcaf2e18c305978985e474daab11e397dc5f4666c13c8e11da0d7180b
   
       Returns a new [[KeyValueGroupedDataset]] with each group sorted by the given expressions.
       Operations that provide an iterator that contains all of the elements in a group will
       then provide a sorted iterator (flatMapGroups, mapGroups, cogroup).
   
   Alternative is to add `flatMapSortedGroups` and `cogroupSorted` with extra sorting parameters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
EnricoMi commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1386698614

   Alright, moving to the `flatMapSortedGroups` and `cogroupSorted` approach then: #39640.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] madwed-stripe commented on pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
madwed-stripe commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1374715189

   This is exactly the functionality we need to use KeyValueGroupedDataset for larger scale datasets. It would be amazing to get this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1386559000

   I like the idea - having a secondary sort key while we are sorting with grouping keys - but the direction does not seem to be right.
   
   As you mentioned in PR description, the effect of calling this method only happens with following flatMapGroups and cogroup. It doesn't seem odd for them, but there are other operations for `KeyValueGroupedDataset`, and users may expect the "same" sort order for other operations. For example, `agg` and `reduceGroups` can be sensitive on the orderness, e.g. first() and last().
   
   If the intention is to address (flat)MapGroups and cogroup specifically, addressing these APIs directly sounds to me as more straightforward way to go. I guess you'd want to disallow sort for streaming, but either 1) you can disallow it in logical planning phase or 2) we can document that sorting is applied per microbatch in streaming query.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #37551: [SPARK-38591][SQL] Add sortWithinGroups to KeyValueGroupedDataset

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #37551:
URL: https://github.com/apache/spark/pull/37551#issuecomment-1386662104

   I'd prefer `flatMapSortedGroups` and `cogroupSorted` with extra sorting parameters. `sortWithinGroups` is good for these group-related methods but not good for others.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org