You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "Hisoka-X (via GitHub)" <gi...@apache.org> on 2023/07/14 10:39:14 UTC

[GitHub] [spark] Hisoka-X commented on a diff in pull request #41958: [SPARK-44386][SQL] Use PartitionEvaluator API in HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec

Hisoka-X commented on code in PR #41958:
URL: https://github.com/apache/spark/pull/41958#discussion_r1263581572


##########
core/src/main/scala/org/apache/spark/Dependency.scala:
##########
@@ -76,7 +75,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  */
 @DeveloperApi
 class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
-    @transient private val _rdd: RDD[_ <: Product2[K, V]],
+    private val _rdd: RDD[_ <: Product2[K, V]],

Review Comment:
   We need to transfer the rdd in `ShuffleDependency` to the executor node to ensure that all metrics can be registered on the executor node. Taking `ObjectHashAggregateExec` as an example, this problem was not exposed before because in `ObjectHashAggregateExec`, we transmit lambda as task information, lambda will contain `ObjectHashAggregateExec` object information, and `ObjectHashAggregateExec` will contain a child `ObjectHashAggregateExec` (this is because of the partition operator, `ObjectHashAggregateExec` will be divided into two stages, and what we lose is the indicator information of the sub-ObjectHashAggregateExec). Sub-ObjectHashAggregateExec will follow the lambda function to transfer the corresponding metrics information to the executor node. Now because we have extracted the logic into the `EvaluatorFactory`, the lambda transfers only the `EvaluatorFactory` object, so we lose the metrics information of the child ObjectHashAggregateExec. This will cause some metrics to be
  lost on the executor side.
   
   The lambda data that will be transferred after `EvaluatorFactory`
   ![image](https://github.com/apache/spark/assets/32387433/e883c7de-d6ea-458e-8e68-f58c7c6d9711)
   
   The lambda data that will be transferred before `EvaluatorFactory`
   ![image](https://github.com/apache/spark/assets/32387433/6d333f45-0e46-4c0c-a978-2352cd099064)
   
   This is information what we need.
   ![image](https://github.com/apache/spark/assets/32387433/3735a487-3088-4867-a7ce-117eccb72398)
   
   The physical plan about `ObjectHashAggregate`
   ```log
   ObjectHashAggregate(keys=[a#3], functions=[collect_set(a#3, 0, 0)], output=[a#3, collect_set(a)#67])
   +- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [plan_id=62]
      +- ObjectHashAggregate(keys=[a#3], functions=[partial_collect_set(a#3, 0, 0)], output=[a#3, buf#71])
         +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
            +- Scan[obj#2]
   ```
   
   cc @cloud-fan @MaxGekk @viirya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org