You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/04 08:16:35 UTC

[GitHub] [spark] HeartSaVioR edited a comment on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

HeartSaVioR edited a comment on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1029748381


   I confirmed that StreamingAggregation has same problem with stream-stream join problem described in SPARK-24588.
   
   Test code:
   
   ```
     test("simple count, update mode, check plan") {
       val inputData = MemoryStream[Int]
   
       val aggregated =
         inputData.toDF()
           .select('value as 'a, 'value * 2 as 'b)
           .repartition('b)
           .groupBy('a, 'b)
           .agg(count("*"))
           .as[(Int, Int, Long)]
   
       testStream(aggregated, Update)(
         AddData(inputData, 3),
         CheckLastBatch((3, 6, 1)),
         AddData(inputData, 3, 2),
         CheckLastBatch((3, 6, 2), (2, 4, 1)),
         StopStream,
         StartStream(),
         AddData(inputData, 3, 2, 1),
         CheckLastBatch((3, 6, 3), (2, 4, 2), (1, 2, 1)),
         // By default we run in new tuple mode.
         AddData(inputData, 4, 4, 4, 4),
         CheckLastBatch((4, 8, 4)),
         Execute { query =>
           logWarning(s"DEBUG: ${query.lastExecution.executedPlan}")
         }
       )
     }
   ```
   
   Output:
   
   ```
   16:52:16.736 WARN org.apache.spark.sql.streaming.StreamingAggregationSuite: DEBUG: WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@61581663, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2099/0x0000000800fcc040@650b39f1
   +- *(4) HashAggregate(keys=[a#3, b#4], functions=[count(1)], output=[a#3, b#4, count(1)#11L])
      +- StateStoreSave [a#3, b#4], state info [ checkpoint = file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId = b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], Update, 0, 2
         +- *(3) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], output=[a#3, b#4, count#78L])
            +- StateStoreRestore [a#3, b#4], state info [ checkpoint = file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId = b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 2
               +- *(2) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], output=[a#3, b#4, count#78L])
                  +- *(2) HashAggregate(keys=[a#3, b#4], functions=[partial_count(1)], output=[a#3, b#4, count#78L])
                     +- Exchange hashpartitioning(b#4, 5), REPARTITION_BY_COL, [id=#580]
                        +- *(1) Project [value#1 AS a#3, (value#1 * 2) AS b#4]
                           +- MicroBatchScan[value#1] MemoryStreamDataSource
   ```
   
   Note that there was only a single shuffle performed via `.repartition('b)` and it satisfies the child distribution on a & b since the required distribution is ClusteredDistribution.
   
   While this seems OK and produces correct output, we can modify the query in various ways to break the query in further run - 1) remove `.repartition('b)` 2) replace it with `.repartition('a)` 3) replace it with `.repartition('a, 'b)` 4) replace it with `.repartition('b, 'a)`. All cases will satisfy `ClusteredDistribution("a", "b")` and does not trigger shuffle with hash partitioning.
   
   The problem persisted on **all stateful operators** (otherwise this PR had to touch more places). Since HashClusteredDistribution was introduced in SPARK-21865 (2.3.0), Spark 2.3.0+ would have this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org