You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/11 17:12:46 UTC

[spark] branch master updated: [SPARK-26300][SS] Remove a redundant `checkForStreaming` call

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d811369  [SPARK-26300][SS] Remove a redundant `checkForStreaming` call
d811369 is described below

commit d811369ce23186cbb3208ad665e15408e13fea87
Author: liuxian <li...@zte.com.cn>
AuthorDate: Tue Dec 11 09:12:17 2018 -0800

    [SPARK-26300][SS] Remove a redundant `checkForStreaming` call
    
    ## What changes were proposed in this pull request?
    If `checkForContinuous`  is called ( `checkForStreaming` is called in `checkForContinuous`  ), the `checkForStreaming`  mothod  will be called twice in `createQuery` , this is not necessary,  and the `checkForStreaming` method has a lot of statements,  so it's better to remove one of them.
    
    ## How was this patch tested?
    
    Existing unit tests in `StreamingQueryManagerSuite` and `ContinuousAggregationSuite`
    
    Closes #23251 from 10110346/isUnsupportedOperationCheckEnabled.
    
    Authored-by: liuxian <li...@zte.com.cn>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/sql/streaming/StreamingQueryManager.scala   | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index d9fe1a9..881cd96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -246,9 +246,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     val analyzedPlan = df.queryExecution.analyzed
     df.queryExecution.assertAnalyzed()
 
-    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
-      UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
-    }
+    val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
 
     if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
       logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
@@ -257,7 +255,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
 
     (sink, trigger) match {
       case (v2Sink: StreamingWriteSupportProvider, trigger: ContinuousTrigger) =>
-        if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
+        if (operationCheckEnabled) {
           UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
         }
         new StreamingQueryWrapper(new ContinuousExecution(
@@ -272,6 +270,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
           extraOptions,
           deleteCheckpointOnStop))
       case _ =>
+        if (operationCheckEnabled) {
+          UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+        }
         new StreamingQueryWrapper(new MicroBatchExecution(
           sparkSession,
           userSpecifiedName.orNull,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org