You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/11 17:12:46 UTC
[spark] branch master updated: [SPARK-26300][SS] Remove a redundant
`checkForStreaming` call
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d811369 [SPARK-26300][SS] Remove a redundant `checkForStreaming` call
d811369 is described below
commit d811369ce23186cbb3208ad665e15408e13fea87
Author: liuxian <li...@zte.com.cn>
AuthorDate: Tue Dec 11 09:12:17 2018 -0800
[SPARK-26300][SS] Remove a redundant `checkForStreaming` call
## What changes were proposed in this pull request?
If `checkForContinuous` is called ( `checkForStreaming` is called in `checkForContinuous` ), the `checkForStreaming` mothod will be called twice in `createQuery` , this is not necessary, and the `checkForStreaming` method has a lot of statements, so it's better to remove one of them.
## How was this patch tested?
Existing unit tests in `StreamingQueryManagerSuite` and `ContinuousAggregationSuite`
Closes #23251 from 10110346/isUnsupportedOperationCheckEnabled.
Authored-by: liuxian <li...@zte.com.cn>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/sql/streaming/StreamingQueryManager.scala | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index d9fe1a9..881cd96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -246,9 +246,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
- if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
- UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
- }
+ val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
@@ -257,7 +255,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
(sink, trigger) match {
case (v2Sink: StreamingWriteSupportProvider, trigger: ContinuousTrigger) =>
- if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
+ if (operationCheckEnabled) {
UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
}
new StreamingQueryWrapper(new ContinuousExecution(
@@ -272,6 +270,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
extraOptions,
deleteCheckpointOnStop))
case _ =>
+ if (operationCheckEnabled) {
+ UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+ }
new StreamingQueryWrapper(new MicroBatchExecution(
sparkSession,
userSpecifiedName.orNull,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org