You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/18 20:26:37 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request, #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

HeartSaVioR opened a new pull request, #38717:
URL: https://github.com/apache/spark/pull/38717

   ### What changes were proposed in this pull request?
   
   This PR proposes to fix the broken metrics when the streaming query has CTE, via applying InlineCTE manually against analyzed plan when collecting metrics.
   
   Suppose a streaming query contains below part as batch side which is joined with streaming source:
   
   ```
   with batch_tbl as (
     SELECT col1, col2 FROM parquet_tbl
   )
   
   SELECT col1 AS key, col2 as value_batch FROM batch_tbl
   ```
   
   Currently, Spark adds WithCTE node with CTERelationDef and CTERelationRef when there is a usage of CTE. Below is an analyzed plan:
   
   ```
   WriteToMicroBatchDataSource MemorySink, 2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
   +- Project [key#15, value_stream#16, value_batch#9L]
      +- Join Inner, (cast(key#15 as bigint) = key#8L)
         :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
         :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
         :     +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
         +- WithCTE
            :- CTERelationDef 0, false
            :  +- SubqueryAlias batch_tbl
            :     +- Project [col1#10L, col2#11L]
            :        +- SubqueryAlias spark_catalog.default.parquet_tbl
            :           +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
            +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
               +- SubqueryAlias batch_tbl
                  +- CTERelationRef 0, true, [col1#10L, col2#11L]
   ```
   
   Here, there are 3 leaf nodes in the plan, but the actual sources in the leaf nodes are 2. During the optimization, inlining CTE happens and there are 2 leaf nodes. Below is the optimized plan:
   
   ```
   WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@622c7c7f]
   +- Project [key#55, value_stream#56, value_batch#9L]
      +- Join Inner, (cast(key#55 as bigint) = key#8L)
         :- Filter isnotnull(key#55)
         :  +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
         +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
            +- Filter isnotnull(col1#10L)
               +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
   ```
   
   Hence executed plan will also have 2 leaf nodes, which does not match with the number of leaf nodes in analyzed plan, and ProgressReporter will give up collecting metrics.
   
   Applying InlineCTE against analyzed plan during collecting metrics would resolve this. For example, below is the logical plan which applies InlineCTE against above analyzed plan.
   
   ```
   WriteToMicroBatchDataSource MemorySink, 2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
   +- Project [key#15, value_stream#16, value_batch#9L]
      +- Join Inner, (cast(key#15 as bigint) = key#8L)
         :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
         :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
         :     +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
         +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
            +- SubqueryAlias batch_tbl
               +- SubqueryAlias batch_tbl
                  +- Project [col1#10L, col2#11L]
                     +- SubqueryAlias spark_catalog.default.parquet_tbl
                        +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
   ```
   
   Note that this is only required for the case where there is at least one of DSv1 streaming source in the streaming query. If streaming query only contains DSv2 data sources as streaming sources, ProgressReporter can just pick up dedicated physical node(s) from executed plan.
   
   ### Why are the changes needed?
   
   The metrics in streaming query are broken if the query contains CTE. 
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New test case.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38717:
URL: https://github.com/apache/spark/pull/38717#issuecomment-1320531472

   cc. @zsxwing @cloud-fan @viirya Please take a look. Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38717:
URL: https://github.com/apache/spark/pull/38717#discussion_r1027251716


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -341,7 +355,13 @@ trait ProgressReporter extends Logging {
       val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
         logicalPlan.collectLeaves().map { leaf => leaf -> source }
       }
-      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming

Review Comment:
   I admit I'm not 100% sure of the historical reason to pick up logical plan rather than optimized plan. My guess is to enable comparing the node between "logical node in newData" and "leaf node in logical plan", as optimizer may make a change against leaf node.
   
   That said, this approach is a best effort and never be a perfect one. Say, if optimizer deals with self-union into aggregation, the optimized plan will have one less leaf node, which breaks the mechanism.
   
   Ideally we should either 1) move all streaming sources to DSv2 or 2) have a dedicated logical and physical node for streaming DSv1 sources, but both don't seem to be easy to achieve.
   
   Another possible idea might be assigning some UUID in the node tag for association and retain the tag even optimizer applies rules. If it could propagate the node tag to physical plan, even better. (If that is feasible, we could simply collect the nodes having node tag from executed plan and match with source.)
   
   I could probably explore the idea, but it would take time, and also I'm not sure whether the idea is feasible one or not. Do you think the idea makes sense, or it is against how Spark optimization/physical rules work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source
URL: https://github.com/apache/spark/pull/38717


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38717:
URL: https://github.com/apache/spark/pull/38717#discussion_r1027251960


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -341,7 +355,13 @@ trait ProgressReporter extends Logging {
       val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
         logicalPlan.collectLeaves().map { leaf => leaf -> source }
       }
-      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming

Review Comment:
   For me, chasing the metrics issue in general is "beyond the scope" as of now, although this must be something we should deal with eventually, sooner than later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38717:
URL: https://github.com/apache/spark/pull/38717#issuecomment-1321250199

   Thanks for understanding. Let's go with no risk fix for now, and have more time to think about the holistic fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38717:
URL: https://github.com/apache/spark/pull/38717#discussion_r1027251716


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -341,7 +355,13 @@ trait ProgressReporter extends Logging {
       val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
         logicalPlan.collectLeaves().map { leaf => leaf -> source }
       }
-      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming

Review Comment:
   I admit I'm not 100% sure of the historical reason to pick up logical plan rather than optimized plan. My guess is to enable comparing the node between "logical node in newData" and "leaf node in logical plan", as optimizer may make a change against leaf node.
   
   That said, this approach is a best effort and never be a perfect one. Say, if optimizer deals with self-union into aggregation, the optimized plan will have one less leaf node, which breaks the mechanism. If optimizer swaps subtrees which end up with swapping leaf nodes, it could lead to incorrect metrics.
   
   Ideally we should either 1) move all streaming sources to DSv2 or 2) have a dedicated logical and physical node for streaming DSv1 sources, but both don't seem to be easy to achieve.
   
   Another possible idea might be assigning some UUID in the node tag for association and retain the tag even optimizer applies rules. If it could propagate the node tag to physical plan, even better. (If that is feasible, we could simply collect the nodes having node tag from executed plan and match with source.)
   
   I could probably explore the idea, but it would take time, and also I'm not sure whether the idea is feasible one or not. Do you think the idea makes sense, or it is against how Spark optimization/physical rules work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #38717:
URL: https://github.com/apache/spark/pull/38717#discussion_r1027237826


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -341,7 +355,13 @@ trait ProgressReporter extends Logging {
       val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
         logicalPlan.collectLeaves().map { leaf => leaf -> source }
       }
-      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming

Review Comment:
   Why don't we just use optimized logical plan here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38717: [SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 streaming source

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38717:
URL: https://github.com/apache/spark/pull/38717#issuecomment-1321250486

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org