You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/29 04:37:50 UTC
[spark] branch branch-3.0 updated: [SPARK-32126][SS] Scope
Session.active in IncrementalExecution
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 51b516a [SPARK-32126][SS] Scope Session.active in IncrementalExecution
51b516a is described below
commit 51b516a62488f1740c3e4ef337db6703cbc36382
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Sun Jun 28 21:35:59 2020 -0700
[SPARK-32126][SS] Scope Session.active in IncrementalExecution
### What changes were proposed in this pull request?
The `optimizedPlan` in IncrementalExecution should also be scoped in `withActive`.
### Why are the changes needed?
Follow-up of SPARK-30798 for the Streaming side.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes #28936 from xuanyuanking/SPARK-30798-follow.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit f944603872284c03c557474bb9e816f20094a630)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala | 2 +-
.../org/apache/spark/sql/execution/streaming/IncrementalExecution.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 1df812d..50c9c31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -129,7 +129,7 @@ class QueryExecution(
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
}
- private def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
+ protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
tracker.measurePhase(phase)(block)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 09ae769..7773ac7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -76,7 +76,7 @@ class IncrementalExecution(
* with the desired literal
*/
override
- lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
+ lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData,
tracker) transformAllExpressions {
case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org