You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/02/03 07:37:58 UTC
[spark] branch master updated: [SPARK-29543][SS][FOLLOWUP] Move
`spark.sql.streaming.ui.*` configs to StaticSQLConf
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a4912ce [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf
a4912ce is described below
commit a4912cee615314e9578e6ab4eae25f147feacbd5
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Sun Feb 2 23:37:13 2020 -0800
[SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf
### What changes were proposed in this pull request?
Put the configs below needed by Structured Streaming UI into StaticSQLConf:
- spark.sql.streaming.ui.enabled
- spark.sql.streaming.ui.retainedProgressUpdates
- spark.sql.streaming.ui.retainedQueries
### Why are the changes needed?
Make all SS UI configs consistent with other similar configs in usage and naming.
### Does this PR introduce any user-facing change?
Yes, add new static config `spark.sql.streaming.ui.retainedProgressUpdates`.
### How was this patch tested?
Existing UT.
Closes #27425 from xuanyuanking/SPARK-29543-follow.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 16 ----------------
.../apache/spark/sql/internal/StaticSQLConf.scala | 20 ++++++++++++++++++++
.../org/apache/spark/sql/internal/SharedState.scala | 15 ++++++++-------
.../streaming/ui/StreamingQueryStatusListener.scala | 10 ++++++----
.../spark/sql/streaming/ui/StreamingQueryTab.scala | 2 +-
.../ui/StreamingQueryStatusListenerSuite.scala | 4 ++--
6 files changed, 37 insertions(+), 30 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 04572c3..3ad3416 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1150,18 +1150,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val STREAMING_UI_ENABLED =
- buildConf("spark.sql.streaming.ui.enabled")
- .doc("Whether to run the structured streaming UI for the Spark application.")
- .booleanConf
- .createWithDefault(true)
-
- val STREAMING_UI_INACTIVE_QUERY_RETENTION =
- buildConf("spark.sql.streaming.ui.numInactiveQueries")
- .doc("The number of inactive queries to retain for structured streaming ui.")
- .intConf
- .createWithDefault(100)
-
val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
@@ -2284,10 +2272,6 @@ class SQLConf extends Serializable with Logging {
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
- def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)
-
- def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)
-
def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 66ac9ddb..6bc7522 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -176,4 +176,24 @@ object StaticSQLConf {
.internal()
.booleanConf
.createWithDefault(true)
+
+ val STREAMING_UI_ENABLED =
+ buildStaticConf("spark.sql.streaming.ui.enabled")
+ .doc("Whether to run the Structured Streaming Web UI for the Spark application when the " +
+ "Spark Web UI is enabled.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val STREAMING_UI_RETAINED_PROGRESS_UPDATES =
+ buildStaticConf("spark.sql.streaming.ui.retainedProgressUpdates")
+ .doc("The number of progress updates to retain for a streaming query for Structured " +
+ "Streaming UI.")
+ .intConf
+ .createWithDefault(100)
+
+ val STREAMING_UI_RETAINED_QUERIES =
+ buildStaticConf("spark.sql.streaming.ui.retainedQueries")
+ .doc("The number of inactive queries to retain for Structured Streaming UI.")
+ .intConf
+ .createWithDefault(100)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index fefd72d..5347264 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -145,13 +145,14 @@ private[sql] class SharedState(
* data to show.
*/
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
- val sqlConf = SQLConf.get
- if (sqlConf.isStreamingUIEnabled) {
- val statusListener = new StreamingQueryStatusListener(sqlConf)
- sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
- Some(statusListener)
- } else {
- None
+ sparkContext.ui.flatMap { ui =>
+ if (conf.get(STREAMING_UI_ENABLED)) {
+ val statusListener = new StreamingQueryStatusListener(conf)
+ new StreamingQueryTab(statusListener, ui)
+ Some(statusListener)
+ } else {
+ None
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index db085db..9181511 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -24,8 +24,9 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
/**
@@ -33,7 +34,7 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro
* UI data for both active and inactive query.
* TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
+private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener {
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
@@ -45,8 +46,9 @@ private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends Stream
private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
- private val streamingProgressRetention = sqlConf.streamingProgressRetention
- private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention
+ private val streamingProgressRetention =
+ conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
+ private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
activeQueryStatus.putIfAbsent(event.runId,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
index f909cfd..bb097ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
@@ -34,6 +34,6 @@ private[sql] class StreamingQueryTab(
parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
}
-object StreamingQueryTab {
+private[sql] object StreamingQueryTab {
private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index bd74ed3..adbb501 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming
class StreamingQueryStatusListenerSuite extends StreamTest {
test("onQueryStarted, onQueryProgress, onQueryTerminated") {
- val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+ val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
// hanlde query started event
val id = UUID.randomUUID()
@@ -73,7 +73,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
}
test("same query start multiple times") {
- val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+ val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
// handle first time start
val id = UUID.randomUUID()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org