You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/29 04:37:50 UTC

[spark] branch branch-3.0 updated: [SPARK-32126][SS] Scope Session.active in IncrementalExecution

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 51b516a  [SPARK-32126][SS] Scope Session.active in IncrementalExecution
51b516a is described below

commit 51b516a62488f1740c3e4ef337db6703cbc36382
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Sun Jun 28 21:35:59 2020 -0700

    [SPARK-32126][SS] Scope Session.active in IncrementalExecution
    
    ### What changes were proposed in this pull request?
    
    The `optimizedPlan` in IncrementalExecution should also be scoped in `withActive`.
    
    ### Why are the changes needed?
    
    Follow-up of SPARK-30798 for the Streaming side.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UT.
    
    Closes #28936 from xuanyuanking/SPARK-30798-follow.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit f944603872284c03c557474bb9e816f20094a630)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala  | 2 +-
 .../org/apache/spark/sql/execution/streaming/IncrementalExecution.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 1df812d..50c9c31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -129,7 +129,7 @@ class QueryExecution(
       Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
   }
 
-  private def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
+  protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
     tracker.measurePhase(phase)(block)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 09ae769..7773ac7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -76,7 +76,7 @@ class IncrementalExecution(
    * with the desired literal
    */
   override
-  lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
+  lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
     sparkSession.sessionState.optimizer.executeAndTrack(withCachedData,
       tracker) transformAllExpressions {
       case ts @ CurrentBatchTimestamp(timestamp, _, _) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org