You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/02/03 07:39:02 UTC

[spark] branch branch-3.0 updated: [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f9b8637  [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf
f9b8637 is described below

commit f9b86370cb04b72a4f00cbd4d60873960aa2792c
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Sun Feb 2 23:37:13 2020 -0800

    [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf
    
    ### What changes were proposed in this pull request?
    Put the configs below needed by Structured Streaming UI into StaticSQLConf:
    
    - spark.sql.streaming.ui.enabled
    - spark.sql.streaming.ui.retainedProgressUpdates
    - spark.sql.streaming.ui.retainedQueries
    
    ### Why are the changes needed?
    Make all SS UI configs consistent with other similar configs in usage and naming.
    
    ### Does this PR introduce any user-facing change?
    Yes, add new static config `spark.sql.streaming.ui.retainedProgressUpdates`.
    
    ### How was this patch tested?
    Existing UT.
    
    Closes #27425 from xuanyuanking/SPARK-29543-follow.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
    (cherry picked from commit a4912cee615314e9578e6ab4eae25f147feacbd5)
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala      | 16 ----------------
 .../apache/spark/sql/internal/StaticSQLConf.scala    | 20 ++++++++++++++++++++
 .../org/apache/spark/sql/internal/SharedState.scala  | 15 ++++++++-------
 .../streaming/ui/StreamingQueryStatusListener.scala  | 10 ++++++----
 .../spark/sql/streaming/ui/StreamingQueryTab.scala   |  2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala       |  4 ++--
 6 files changed, 37 insertions(+), 30 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 04572c3..3ad3416 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1150,18 +1150,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val STREAMING_UI_ENABLED =
-    buildConf("spark.sql.streaming.ui.enabled")
-      .doc("Whether to run the structured streaming UI for the Spark application.")
-      .booleanConf
-      .createWithDefault(true)
-
-  val STREAMING_UI_INACTIVE_QUERY_RETENTION =
-    buildConf("spark.sql.streaming.ui.numInactiveQueries")
-      .doc("The number of inactive queries to retain for structured streaming ui.")
-      .intConf
-      .createWithDefault(100)
-
   val VARIABLE_SUBSTITUTE_ENABLED =
     buildConf("spark.sql.variable.substitute")
       .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
@@ -2284,10 +2272,6 @@ class SQLConf extends Serializable with Logging {
 
   def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
 
-  def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)
-
-  def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)
-
   def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
 
   def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 66ac9ddb..6bc7522 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -176,4 +176,24 @@ object StaticSQLConf {
       .internal()
       .booleanConf
       .createWithDefault(true)
+
+  val STREAMING_UI_ENABLED =
+    buildStaticConf("spark.sql.streaming.ui.enabled")
+      .doc("Whether to run the Structured Streaming Web UI for the Spark application when the " +
+        "Spark Web UI is enabled.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val STREAMING_UI_RETAINED_PROGRESS_UPDATES =
+    buildStaticConf("spark.sql.streaming.ui.retainedProgressUpdates")
+      .doc("The number of progress updates to retain for a streaming query for Structured " +
+        "Streaming UI.")
+      .intConf
+      .createWithDefault(100)
+
+  val STREAMING_UI_RETAINED_QUERIES =
+    buildStaticConf("spark.sql.streaming.ui.retainedQueries")
+      .doc("The number of inactive queries to retain for Structured Streaming UI.")
+      .intConf
+      .createWithDefault(100)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index fefd72d..5347264 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -145,13 +145,14 @@ private[sql] class SharedState(
    * data to show.
    */
   lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
-    val sqlConf = SQLConf.get
-    if (sqlConf.isStreamingUIEnabled) {
-      val statusListener = new StreamingQueryStatusListener(sqlConf)
-      sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
-      Some(statusListener)
-    } else {
-      None
+    sparkContext.ui.flatMap { ui =>
+      if (conf.get(STREAMING_UI_ENABLED)) {
+        val statusListener = new StreamingQueryStatusListener(conf)
+        new StreamingQueryTab(statusListener, ui)
+        Some(statusListener)
+      } else {
+        None
+      }
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index db085db..9181511 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -24,8 +24,9 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
 
 /**
@@ -33,7 +34,7 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro
  * UI data for both active and inactive query.
  * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
+private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener {
 
   private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
   timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -45,8 +46,9 @@ private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends Stream
   private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
   private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
 
-  private val streamingProgressRetention = sqlConf.streamingProgressRetention
-  private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention
+  private val streamingProgressRetention =
+    conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
+  private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
   override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
     activeQueryStatus.putIfAbsent(event.runId,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
index f909cfd..bb097ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
@@ -34,6 +34,6 @@ private[sql] class StreamingQueryTab(
   parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
 }
 
-object StreamingQueryTab {
+private[sql] object StreamingQueryTab {
   private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index bd74ed3..adbb501 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming
 class StreamingQueryStatusListenerSuite extends StreamTest {
 
   test("onQueryStarted, onQueryProgress, onQueryTerminated") {
-    val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+    val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
 
     // hanlde query started event
     val id = UUID.randomUUID()
@@ -73,7 +73,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
   }
 
   test("same query start multiple times") {
-    val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+    val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
 
     // handle first time start
     val id = UUID.randomUUID()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org