You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/08 00:07:40 UTC

[GitHub] [spark] c21 commented on a change in pull request #35419: [SPARK-38124][SQL][SS] Introduce StatefulOpClusteredDistribution and apply to all stateful operators

c21 commented on a change in pull request #35419:
URL: https://github.com/apache/spark/pull/35419#discussion_r801162163



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -337,7 +337,7 @@ case class StateStoreRestoreExec(
     if (keyExpressions.isEmpty) {
       AllTuples :: Nil
     } else {
-      ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
+      StatefulOpClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil

Review comment:
        I am wondering if this change could introduce extra shuffle for streaming aggregate. Previously the operator requires `ClusteredDistribution`, and right now it requires `StatefulOpClusteredDistribution`/`HashClusteredDistribution`. 
   
   `ClusteredDistribution` is more relaxed than `HashClusteredDistribution` in the sense that a `HashPartitioning(c1)` can satisfy `ClusteredDistribution(c1, c2)`, but cannot satisfy `HashClusteredDistribution(c1, c2)`. In short, `ClusteredDistribution` allows child to be hash-partitioned on subset of required keys. So for aggregate, if the plan is already shuffled on subset of group-by columns, Spark will not add a shuffle again before group-by.
   
   For example:
   
   ```
   MemoryStream[(Int, Int)].toDF()
     .repartition($"_1")
     .groupBy($"_1", $"_2")
     .agg(count("*"))
     .as[(Int, Int, Long)]
   ```
   
   and the query plan:
   
   ```
   WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5940f7c2, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1940/1200613952@4861dac3
   +- *(4) HashAggregate(keys=[_1#588, _2#589], functions=[count(1)], output=[_1#588, _2#589, count(1)#596L])
      +- StateStoreSave [_1#588, _2#589], state info [ checkpoint = file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/streaming.metadata-0d7cb004-92dd-4b0d-9d90-5a65c0d2934c/state, runId = 68598bd1-cf35-4bf7-a167-5f73dc9f4d84, opId = 0, ver = 0, numPartitions = 5], Complete, 0, 1
         +- *(3) HashAggregate(keys=[_1#588, _2#589], functions=[merge_count(1)], output=[_1#588, _2#589, count#663L])
            +- StateStoreRestore [_1#588, _2#589], state info [ checkpoint = file:/private/var/folders/y5/hnsw8mz93vs57ngcd30y6y9c0000gn/T/streaming.metadata-0d7cb004-92dd-4b0d-9d90-5a65c0d2934c/state, runId = 68598bd1-cf35-4bf7-a167-5f73dc9f4d84, opId = 0, ver = 0, numPartitions = 5], 1
               +- *(2) HashAggregate(keys=[_1#588, _2#589], functions=[merge_count(1)], output=[_1#588, _2#589, count#663L])
                  +- *(2) HashAggregate(keys=[_1#588, _2#589], functions=[partial_count(1)], output=[_1#588, _2#589, count#663L])
                     +- Exchange hashpartitioning(_1#588, 5), REPARTITION_BY_COL, [id=#2008]
                        +- *(1) Project [_1#588, _2#589]
                           +- MicroBatchScan[_1#588, _2#589] MemoryStreamDataSource
   ```
   
   One can argue the previous behavior for streaming aggregate is not wrong. As long as all rows for same keys are colocated in same partition, `StateStoreRestore/Store` should output correct answer for streaming aggregate. If we make the change here, I assume one extra shuffle on `($"_1", $"_2")` would be introduced, and it might yield incorrect result when running the new query plan against the existing state store? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org