You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/12/03 01:13:23 UTC
[spark] branch master updated: [SPARK-31953][SS] Add Spark
Structured Streaming History Server Support
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4f96670 [SPARK-31953][SS] Add Spark Structured Streaming History Server Support
4f96670 is described below
commit 4f9667035886a67e6c9a4e8fad2efa390e87ca68
Author: uncleGen <hu...@gmail.com>
AuthorDate: Wed Dec 2 17:11:51 2020 -0800
[SPARK-31953][SS] Add Spark Structured Streaming History Server Support
### What changes were proposed in this pull request?
Add Spark Structured Streaming History Server Support.
### Why are the changes needed?
Add a streaming query history server plugin.
![image](https://user-images.githubusercontent.com/7402327/84248291-d26cfe80-ab3b-11ea-86d2-98205fa2bcc4.png)
![image](https://user-images.githubusercontent.com/7402327/84248347-e44ea180-ab3b-11ea-81de-eefe207656f2.png)
![image](https://user-images.githubusercontent.com/7402327/84248396-f0d2fa00-ab3b-11ea-9b0d-e410115471b0.png)
- Follow-ups
- Query duration should not update in history UI.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Update UT.
Closes #28781 from uncleGen/SPARK-31953.
Lead-authored-by: uncleGen <hu...@gmail.com>
Co-authored-by: Genmao Yu <hu...@gmail.com>
Co-authored-by: Yuanjian Li <yu...@databricks.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
dev/.rat-excludes | 1 +
.../org.apache.spark.status.AppHistoryServerPlugin | 1 +
.../streaming/StreamingQueryListenerBus.scala | 26 +++-
.../ui/StreamingQueryHistoryServerPlugin.scala | 43 ++++++
.../execution/ui/StreamingQueryStatusStore.scala | 53 +++++++
.../apache/spark/sql/internal/SharedState.scala | 8 +-
.../sql/streaming/StreamingQueryManager.scala | 3 +-
.../sql/streaming/ui/StreamingQueryPage.scala | 44 +++---
.../ui/StreamingQueryStatisticsPage.scala | 27 ++--
.../ui/StreamingQueryStatusListener.scala | 166 +++++++++++++--------
.../spark/sql/streaming/ui/StreamingQueryTab.scala | 3 +-
.../apache/spark/sql/streaming/ui/UIUtils.scala | 12 +-
.../resources/spark-events/local-1596020211915 | 160 ++++++++++++++++++++
.../org/apache/spark/deploy/history/Utils.scala} | 39 ++---
.../streaming/ui/StreamingQueryHistorySuite.scala | 63 ++++++++
.../sql/streaming/ui/StreamingQueryPageSuite.scala | 42 +++---
.../ui/StreamingQueryStatusListenerSuite.scala | 159 ++++++++++++++++----
17 files changed, 673 insertions(+), 177 deletions(-)
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 7da330d..167cf22 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -123,6 +123,7 @@ SessionHandler.java
GangliaReporter.java
application_1578436911597_0052
config.properties
+local-1596020211915
app-20200706201101-0003
py.typed
_metadata
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
index 0bba2f8..6771eef 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -1 +1,2 @@
org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin
+org.apache.spark.sql.execution.ui.StreamingQueryHistoryServerPlugin
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 1b8d69f..4b98acd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -31,16 +31,21 @@ import org.apache.spark.util.ListenerBus
* Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them
* to StreamingQueryListeners.
*
- * Note that each bus and its registered listeners are associated with a single SparkSession
+ * Note 1: Each bus and its registered listeners are associated with a single SparkSession
* and StreamingQueryManager. So this bus will dispatch events to registered listeners for only
* those queries that were started in the associated SparkSession.
+ *
+ * Note 2: To rebuild Structured Streaming UI in SHS, this bus will be registered into
+ * [[org.apache.spark.scheduler.ReplayListenerBus]]. We check `sparkListenerBus` defined or not to
+ * determine how to process [[StreamingQueryListener.Event]]. If false, it means this bus is used to
+ * replay all streaming query event from eventLog.
*/
-class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {
import StreamingQueryListener._
- sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY)
+ sparkListenerBus.foreach(_.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY))
/**
* RunIds of active queries whose events are supposed to be forwarded by this ListenerBus
@@ -67,11 +72,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
event match {
case s: QueryStartedEvent =>
activeQueryRunIds.synchronized { activeQueryRunIds += s.runId }
- sparkListenerBus.post(s)
+ sparkListenerBus.foreach(bus => bus.post(s))
// post to local listeners to trigger callbacks
postToAll(s)
case _ =>
- sparkListenerBus.post(event)
+ sparkListenerBus.foreach(bus => bus.post(event))
}
}
@@ -95,7 +100,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
// thread
- if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
+ //
+ // When loaded by Spark History Server, we should process all event coming from replay
+ // listener bus.
+ if (sparkListenerBus.isEmpty || !LiveListenerBus.withinListenerThread.value ||
+ !e.isInstanceOf[QueryStartedEvent]) {
postToAll(e)
}
case _ =>
@@ -110,7 +119,10 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
listener: StreamingQueryListener,
event: StreamingQueryListener.Event): Unit = {
def shouldReport(runId: UUID): Boolean = {
- activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
+ // When loaded by Spark History Server, we should process all event coming from replay
+ // listener bus.
+ sparkListenerBus.isEmpty ||
+ activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
}
event match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
new file mode 100644
index 0000000..a127fa5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus
+import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
+import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
+import org.apache.spark.ui.SparkUI
+
+class StreamingQueryHistoryServerPlugin extends AppHistoryServerPlugin {
+
+ override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = {
+ val listenerBus = new StreamingQueryListenerBus(None)
+ listenerBus.addListener(new StreamingQueryStatusListener(conf, store))
+ Seq(listenerBus)
+ }
+
+ override def setupUI(ui: SparkUI): Unit = {
+ val streamingQueryStatusStore = new StreamingQueryStatusStore(ui.store.store)
+ if (streamingQueryStatusStore.allQueryUIData.nonEmpty) {
+ new StreamingQueryTab(streamingQueryStatusStore, ui)
+ }
+ }
+
+ override def displayOrder: Int = 1
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
new file mode 100644
index 0000000..9eb14a6a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.UUID
+
+import org.apache.spark.sql.streaming.ui.{StreamingQueryData, StreamingQueryProgressWrapper, StreamingQueryUIData}
+import org.apache.spark.status.KVUtils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query Streaming Query state.
+ * There's no state kept in this class, so it's ok to have multiple instances of it in an
+ * application.
+ */
+class StreamingQueryStatusStore(store: KVStore) {
+
+ def allQueryUIData: Seq[StreamingQueryUIData] = {
+ val view = store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L)
+ KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData)
+ }
+
+ // visible for test
+ private[sql] def getQueryProgressData(runId: UUID): Seq[StreamingQueryProgressWrapper] = {
+ val view = store.view(classOf[StreamingQueryProgressWrapper])
+ .index("runId").first(runId.toString).last(runId.toString)
+ KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ }
+
+ private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = {
+ val runId = summary.runId.toString
+ val view = store.view(classOf[StreamingQueryProgressWrapper])
+ .index("runId").first(runId).last(runId)
+ val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ .map(_.progress).sortBy(_.timestamp).toArray
+ StreamingQueryUIData(summary, recentProgress)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 89aceac..ea430db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
-import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
+import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab, StreamingQueryStatusStore}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
import org.apache.spark.status.ElementTrackingStore
@@ -111,9 +111,9 @@ private[sql] class SharedState(
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
sparkContext.ui.flatMap { ui =>
if (conf.get(STREAMING_UI_ENABLED)) {
- val statusListener = new StreamingQueryStatusListener(conf)
- new StreamingQueryTab(statusListener, ui)
- Some(statusListener)
+ val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
+ new StreamingQueryTab(new StreamingQueryStatusStore(kvStore), ui)
+ Some(new StreamingQueryStatusListener(conf, kvStore))
} else {
None
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index ffdbe9d..b66037d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -49,7 +49,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
- private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
+ private val listenerBus =
+ new StreamingQueryListenerBus(Some(sparkSession.sparkContext.listenerBus))
@GuardedBy("activeQueriesSharedLock")
private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
index b98fdf1..96e4989 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
@@ -40,8 +40,8 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
}
private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
- val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
- .partition(_.isActive)
+ val (activeQueries, inactiveQueries) =
+ parent.store.allQueryUIData.partition(_.summary.isActive)
val content = mutable.ListBuffer[Node]()
// show active queries table only if there is at least one active query
@@ -176,7 +176,7 @@ class StreamingQueryPagedTable(
val streamingQuery = query.streamingUIData
val statisticsLink = "%s/%s/statistics?id=%s"
.format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix,
- streamingQuery.runId)
+ streamingQuery.summary.runId)
def details(detail: Any): Seq[Node] = {
if (isActive) {
@@ -194,14 +194,14 @@ class StreamingQueryPagedTable(
<tr>
<td>{UIUtils.getQueryName(streamingQuery)}</td>
<td>{UIUtils.getQueryStatus(streamingQuery)}</td>
- <td>{streamingQuery.id}</td>
- <td><a href={statisticsLink}>{streamingQuery.runId}</a></td>
- <td>{SparkUIUtils.formatDate(streamingQuery.startTimestamp)}</td>
+ <td>{streamingQuery.summary.id}</td>
+ <td><a href={statisticsLink}>{streamingQuery.summary.runId}</a></td>
+ <td>{SparkUIUtils.formatDate(streamingQuery.summary.startTimestamp)}</td>
<td>{SparkUIUtils.formatDurationVerbose(query.duration)}</td>
<td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td>
- {details(streamingQuery.exception.getOrElse("-"))}
+ {details(streamingQuery.summary.exception.getOrElse("-"))}
</tr>
}
}
@@ -222,32 +222,32 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St
override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to)
- private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = {
+ private def streamingRow(uiData: StreamingQueryUIData): StructuredStreamingRow = {
val duration = if (isActive) {
- System.currentTimeMillis() - query.startTimestamp
+ System.currentTimeMillis() - uiData.summary.startTimestamp
} else {
- withNoProgress(query, {
- val endTimeMs = query.lastProgress.timestamp
- parseProgressTimestamp(endTimeMs) - query.startTimestamp
+ withNoProgress(uiData, {
+ val endTimeMs = uiData.lastProgress.timestamp
+ parseProgressTimestamp(endTimeMs) - uiData.summary.startTimestamp
}, 0)
}
- val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
- query.recentProgress.length)
+ val avgInput = (uiData.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
+ uiData.recentProgress.length)
- val avgProcess = (query.recentProgress.map(p =>
- withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length)
+ val avgProcess = (uiData.recentProgress.map(p =>
+ withNumberInvalid(p.processedRowsPerSecond)).sum / uiData.recentProgress.length)
- StructuredStreamingRow(duration, avgInput, avgProcess, query)
+ StructuredStreamingRow(duration, avgInput, avgProcess, uiData)
}
private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = {
val ordering: Ordering[StructuredStreamingRow] = sortColumn match {
- case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData))
- case "Status" => Ordering.by(q => UIUtils.getQueryStatus(q.streamingUIData))
- case "ID" => Ordering.by(_.streamingUIData.id)
- case "Run ID" => Ordering.by(_.streamingUIData.runId)
- case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp)
+ case "Name" => Ordering.by(row => UIUtils.getQueryName(row.streamingUIData))
+ case "Status" => Ordering.by(row => UIUtils.getQueryStatus(row.streamingUIData))
+ case "ID" => Ordering.by(_.streamingUIData.summary.id)
+ case "Run ID" => Ordering.by(_.streamingUIData.summary.runId)
+ case "Start Time" => Ordering.by(_.streamingUIData.summary.startTimestamp)
case "Duration" => Ordering.by(_.duration)
case "Avg Input /sec" => Ordering.by(_.avgInput)
case "Avg Process /sec" => Ordering.by(_.avgProcess)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index 24709ba..97691d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -58,8 +58,8 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
- val query = parent.statusListener.allQueryStatus.find { case q =>
- q.runId.equals(UUID.fromString(parameterId))
+ val query = parent.store.allQueryUIData.find { uiData =>
+ uiData.summary.runId.equals(UUID.fromString(parameterId))
}.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId"))
val resources = generateLoadResources(request)
@@ -109,34 +109,35 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
<script>{Unparsed(js)}</script>
}
- def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
- val duration = if (query.isActive) {
- SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp)
+ def generateBasicInfo(uiData: StreamingQueryUIData): Seq[Node] = {
+ val duration = if (uiData.summary.isActive) {
+ val durationMs = System.currentTimeMillis() - uiData.summary.startTimestamp
+ SparkUIUtils.formatDurationVerbose(durationMs)
} else {
- withNoProgress(query, {
- val end = query.lastProgress.timestamp
- val start = query.recentProgress.head.timestamp
+ withNoProgress(uiData, {
+ val end = uiData.lastProgress.timestamp
+ val start = uiData.recentProgress.head.timestamp
SparkUIUtils.formatDurationVerbose(
parseProgressTimestamp(end) - parseProgressTimestamp(start))
}, "-")
}
- val name = UIUtils.getQueryName(query)
- val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0)
+ val name = UIUtils.getQueryName(uiData)
+ val numBatches = withNoProgress(uiData, { uiData.lastProgress.batchId + 1L }, 0)
<div>Running batches for
<strong>
{duration}
</strong>
since
<strong>
- {SparkUIUtils.formatDate(query.startTimestamp)}
+ {SparkUIUtils.formatDate(uiData.summary.startTimestamp)}
</strong>
(<strong>{numBatches}</strong> completed batches)
</div>
<br />
<div><strong>Name: </strong>{name}</div>
- <div><strong>Id: </strong>{query.id}</div>
- <div><strong>RunId: </strong>{query.runId}</div>
+ <div><strong>Id: </strong>{uiData.summary.id}</div>
+ <div><strong>RunId: </strong>{uiData.summary.runId}</div>
<br />
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index e331083..fdd3754 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.fasterxml.jackson.annotation.JsonIgnore
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
/**
* A customized StreamingQueryListener used in structured streaming UI, which contains all
* UI data for both active and inactive query.
- * TODO: Add support for history server.
*/
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener {
-
- /**
- * We use runId as the key here instead of id in active query status map,
- * because the runId is unique for every started query, even it its a restart.
- */
- private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
- private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+ conf: SparkConf,
+ store: ElementTrackingStore) extends StreamingQueryListener {
private val streamingProgressRetention =
conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
+ store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) { count =>
+ cleanupInactiveQueries(count)
+ }
+
+ // Events from the same query run will never be processed concurrently, so it's safe to
+ // access `progressIds` without any protection.
+ private val queryToProgress = new ConcurrentHashMap[UUID, mutable.Queue[String]]()
+
+ private def cleanupInactiveQueries(count: Long): Unit = {
+ val view = store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+ val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+ val numInactiveQueries = inactiveQueries.size
+ if (numInactiveQueries <= inactiveQueryStatusRetention) {
+ return
+ }
+ val toDelete = inactiveQueries.sortBy(_.endTimestamp.get)
+ .take(numInactiveQueries - inactiveQueryStatusRetention)
+ val runIds = toDelete.map { e =>
+ store.delete(e.getClass, e.runId)
+ e.runId.toString
+ }
+ // Delete wrappers in one pass, as deleting them for each summary is slow
+ store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], "runId", runIds)
+ }
+
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
val startTimestamp = parseProgressTimestamp(event.timestamp)
- activeQueryStatus.putIfAbsent(event.runId,
- new StreamingQueryUIData(event.name, event.id, event.runId, startTimestamp))
+ store.write(new StreamingQueryData(
+ event.name,
+ event.id,
+ event.runId,
+ isActive = true,
+ None,
+ startTimestamp
+ ), checkTriggers = true)
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
- val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
- val queryStatus = activeQueryStatus.getOrDefault(
- event.progress.runId,
- new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId,
- batchTimestamp))
- queryStatus.updateProcess(event.progress, streamingProgressRetention)
- }
-
- override def onQueryTerminated(
- event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
- val queryStatus = activeQueryStatus.remove(event.runId)
- if (queryStatus != null) {
- queryStatus.queryTerminated(event)
- inactiveQueryStatus += queryStatus
- while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
- inactiveQueryStatus.dequeue()
- }
+ val runId = event.progress.runId
+ val batchId = event.progress.batchId
+ val timestamp = event.progress.timestamp
+ if (!queryToProgress.containsKey(runId)) {
+ queryToProgress.put(runId, mutable.Queue.empty[String])
+ }
+ val progressIds = queryToProgress.get(runId)
+ progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+ store.write(new StreamingQueryProgressWrapper(event.progress))
+ while (progressIds.length > streamingProgressRetention) {
+ val uniqueId = progressIds.dequeue
+ store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
}
}
- def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
- activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
+ override def onQueryTerminated(
+ event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
+ val querySummary = store.read(classOf[StreamingQueryData], event.runId)
+ val curTime = System.currentTimeMillis()
+ store.write(new StreamingQueryData(
+ querySummary.name,
+ querySummary.id,
+ querySummary.runId,
+ isActive = false,
+ querySummary.exception,
+ querySummary.startTimestamp,
+ Some(curTime)
+ ), checkTriggers = true)
+ queryToProgress.remove(event.runId)
}
}
+private[sql] class StreamingQueryData(
+ val name: String,
+ val id: UUID,
+ @KVIndexParam val runId: UUID,
+ @KVIndexParam("active") val isActive: Boolean,
+ val exception: Option[String],
+ @KVIndexParam("startTimestamp") val startTimestamp: Long,
+ val endTimestamp: Option[Long] = None)
+
/**
* This class contains all message related to UI display, each instance corresponds to a single
* [[org.apache.spark.sql.streaming.StreamingQuery]].
*/
-private[ui] class StreamingQueryUIData(
- val name: String,
- val id: UUID,
- val runId: UUID,
- val startTimestamp: Long) {
-
- /** Holds the most recent query progress updates. */
- private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
-
- private var _isActive = true
- private var _exception: Option[String] = None
-
- def isActive: Boolean = synchronized { _isActive }
-
- def exception: Option[String] = synchronized { _exception }
-
- def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
- _isActive = false
- _exception = event.exception
- }
-
- def updateProcess(
- newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized {
- progressBuffer += newProgress
- while (progressBuffer.length >= retentionNum) {
- progressBuffer.dequeue()
+private[sql] case class StreamingQueryUIData(
+ summary: StreamingQueryData,
+ recentProgress: Array[StreamingQueryProgress]) {
+
+ def lastProgress: StreamingQueryProgress = {
+ if (recentProgress.nonEmpty) {
+ recentProgress.last
+ } else {
+ null
}
}
+}
- def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
- progressBuffer.toArray
- }
+private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) {
+ @JsonIgnore @KVIndex
+ private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp)
- def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
- progressBuffer.lastOption.orNull
+ @JsonIgnore @KVIndex("runId")
+ private def runIdIndex: String = progress.runId.toString
+}
+
+private[sql] object StreamingQueryProgressWrapper {
+ /**
+ * Adding `timestamp` into unique id to support reporting `empty` query progress
+ * in which no data comes but with the same batchId.
+ */
+ def getUniqueId(
+ runId: UUID,
+ batchId: Long,
+ timestamp: String): String = {
+ s"${runId}_${batchId}_$timestamp"
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
index bb097ff..65cad8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.streaming.ui
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[sql] class StreamingQueryTab(
- val statusListener: StreamingQueryStatusListener,
+ val store: StreamingQueryStatusStore,
sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
override val name = "Structured Streaming"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
index 1f7e65d..88a110f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
@@ -46,19 +46,19 @@ private[ui] object UIUtils {
}
}
- def getQueryName(query: StreamingQueryUIData): String = {
- if (query.name == null || query.name.isEmpty) {
+ def getQueryName(uiData: StreamingQueryUIData): String = {
+ if (uiData.summary.name == null || uiData.summary.name.isEmpty) {
"<no name>"
} else {
- query.name
+ uiData.summary.name
}
}
- def getQueryStatus(query: StreamingQueryUIData): String = {
- if (query.isActive) {
+ def getQueryStatus(uiData: StreamingQueryUIData): String = {
+ if (uiData.summary.isActive) {
"RUNNING"
} else {
- query.exception.map(_ => "FAILED").getOrElse("FINISHED")
+ uiData.summary.exception.map(_ => "FAILED").getOrElse("FINISHED")
}
}
diff --git a/sql/core/src/test/resources/spark-events/local-1596020211915 b/sql/core/src/test/resources/spark-events/local-1596020211915
new file mode 100644
index 0000000..ff34bbc
--- /dev/null
+++ b/sql/core/src/test/resources/spark-events/local-1596020211915
@@ -0,0 +1,160 @@
+{"Event":"SparkListenerLogStart","Spark Version":"3.1.0-SNAPSHOT"}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":0,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":1,"Discovery Script":"","Vendor":""},"memory":{"Resource Name":"memory","Amount":1024,"Discovery Script":"","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0}}}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1596020212090,"Executor ID":"driver","Executor Info":{"Host":"iZbp19vpr16ix621sdw476Z","Total Cores":4,"Log Urls":{},"Attributes":{},"Resources":{},"Resource Profile Id":0}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Port":39845},"Maximum Memory":384093388,"Timestamp":1596020212109,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre","Java Version":"1.8.0_252 (Oracle Corporation)","Scala Version":"version 2.12.10"},"Spark Properties":{"spark.driver.host":"iZbp19vpr16ix621sdw476Z","spark.eventLog.enabled":"true","spark.driver.port":"46309","spark.jars":"file:/root/spark-3.1.0-SNAPSHOT-bin-hadoop2.8/./examples/jars/spark-examples_2.12-3.1.0-SNAPSHOT.jar","spark.app.name":"Structure [...]
+{"Event":"SparkListenerApplicationStart","App Name":"StructuredKafkaWordCount","App ID":"local-1596020211915","Timestamp":1596020210919,"User":"root"}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent","id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:56:55.947Z"}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":0,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":1,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1596020221633,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Partiti [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1596020221738,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1596020221738,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222649,"Failed":false,"Killed":false,"Accumulables":[{"ID":21,"Name":"shuffle write time","Update":"9599308","Value":"9599308","Interna [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Partiti [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number of [...]
+{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1596020222709,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":0,"Attempt":0,"Launch Time":1596020222713,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":0,"Attempt":0,"Launch Time":1596020222713,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222954,"Failed":false,"Killed":false,"Accumulables":[{"ID":44,"Name":"duration","Update":"19","Value":"19","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1596020222709,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222965,"Failed":false,"Killed":false,"Accumulables":[{"ID":44,"Name":"duration","Update":"33","Value":"52","Internal":true,"Count Failed Value [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number of [...]
+{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1596020222973,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":2,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":2,"time":1596020223062}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":1,"time":1596020223069}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":0,"time":1596020223069}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:56:56.015Z","batchId":0,"batchDuration":7110,"durationMs":{"triggerExecution":7109,"queryPlanning":439,"getBatch":21,"latestOffset":3524,"addBatch":3011,"walCommit":35},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":776,"numL [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":3,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":4,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"SparkListenerJobStart","Job ID":1,"Submission Time":1596020223482,"Stage Infos":[{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":0,"Attempt":0,"Launch Time":1596020223493,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":2,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":0,"Attempt":0,"Launch Time":1596020223493,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223601,"Failed":false,"Killed":false,"Accumulables":[{"ID":178,"Name":"shuffle write time","Update":"837580","Value":"837580","Internal [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":3,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerTaskStart","Stage ID":3,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1596020223625,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":3,"Stage Attempt ID":0,"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1596020223626,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":3,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1596020223625,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223717,"Failed":false,"Killed":false,"Accumulables":[{"ID":201,"Name":"duration","Update":"4","Value":"4","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":3,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1596020223626,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223720,"Failed":false,"Killed":false,"Accumulables":[{"ID":201,"Name":"duration","Update":"4","Value":"8","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":3,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerJobEnd","Job ID":1,"Completion Time":1596020223725,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":5,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":5,"time":1596020223761}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":4,"time":1596020223762}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":3,"time":1596020223762}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:03.168Z","batchId":1,"batchDuration":622,"durationMs":{"triggerExecution":622,"queryPlanning":47,"getBatch":0,"latestOffset":7,"addBatch":478,"walCommit":59},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1216,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":6,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":7,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"SparkListenerJobStart","Job ID":2,"Submission Time":1596020224100,"Stage Infos":[{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":f [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":4,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":30,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"74\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[29],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerTaskStart","Stage ID":4,"Stage Attempt ID":0,"Task Info":{"Task ID":6,"Index":0,"Attempt":0,"Launch Time":1596020224113,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":4,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":6,"Index":0,"Attempt":0,"Launch Time":1596020224113,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224174,"Failed":false,"Killed":false,"Accumulables":[{"ID":335,"Name":"shuffle write time","Update":"686296","Value":"686296","Internal [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":4,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":30,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"74\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[29],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerTaskStart","Stage ID":5,"Stage Attempt ID":0,"Task Info":{"Task ID":7,"Index":0,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":5,"Stage Attempt ID":0,"Task Info":{"Task ID":8,"Index":1,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":5,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":7,"Index":0,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224256,"Failed":false,"Killed":false,"Accumulables":[{"ID":358,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":5,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":8,"Index":1,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224257,"Failed":false,"Killed":false,"Accumulables":[{"ID":358,"Name":"duration","Update":"4","Value":"7","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerJobEnd","Job ID":2,"Completion Time":1596020224259,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":8,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":8,"time":1596020224287}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":7,"time":1596020224287}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":6,"time":1596020224288}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:03.793Z","batchId":2,"batchDuration":522,"durationMs":{"triggerExecution":522,"queryPlanning":41,"getBatch":1,"latestOffset":3,"addBatch":421,"walCommit":27},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":9,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":10,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":3,"Submission Time":1596020224533,"Stage Infos":[{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerTaskStart","Stage ID":6,"Stage Attempt ID":0,"Task Info":{"Task ID":9,"Index":0,"Attempt":0,"Launch Time":1596020224541,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":6,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":9,"Index":0,"Attempt":0,"Launch Time":1596020224541,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224581,"Failed":false,"Killed":false,"Accumulables":[{"ID":492,"Name":"shuffle write time","Update":"643278","Value":"643278","Internal [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":7,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":47,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"99\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[46],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerTaskStart","Stage ID":7,"Stage Attempt ID":0,"Task Info":{"Task ID":10,"Index":0,"Attempt":0,"Launch Time":1596020224596,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":7,"Stage Attempt ID":0,"Task Info":{"Task ID":11,"Index":1,"Attempt":0,"Launch Time":1596020224597,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":7,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":10,"Index":0,"Attempt":0,"Launch Time":1596020224596,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224670,"Failed":false,"Killed":false,"Accumulables":[{"ID":515,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":7,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":11,"Index":1,"Attempt":0,"Launch Time":1596020224597,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224687,"Failed":false,"Killed":false,"Accumulables":[{"ID":515,"Name":"duration","Update":"4","Value":"7","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":7,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":47,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"99\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[46],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerJobEnd","Job ID":3,"Completion Time":1596020224689,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":11,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":11,"time":1596020224713}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":10,"time":1596020224714}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":9,"time":1596020224714}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:04.317Z","batchId":3,"batchDuration":415,"durationMs":{"triggerExecution":415,"queryPlanning":38,"getBatch":1,"latestOffset":3,"addBatch":332,"walCommit":21},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":12,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":13,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":4,"Submission Time":1596020224928,"Stage Infos":[{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier": [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":8,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":54,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"140\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[53],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerTaskStart","Stage ID":8,"Stage Attempt ID":0,"Task Info":{"Task ID":12,"Index":0,"Attempt":0,"Launch Time":1596020224941,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":8,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":12,"Index":0,"Attempt":0,"Launch Time":1596020224941,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224979,"Failed":false,"Killed":false,"Accumulables":[{"ID":649,"Name":"shuffle write time","Update":"572754","Value":"572754","Interna [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":8,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":54,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"140\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[53],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerTaskStart","Stage ID":9,"Stage Attempt ID":0,"Task Info":{"Task ID":13,"Index":0,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":9,"Stage Attempt ID":0,"Task Info":{"Task ID":14,"Index":1,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":9,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":14,"Index":1,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225056,"Failed":false,"Killed":false,"Accumulables":[{"ID":672,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":9,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":13,"Index":0,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225058,"Failed":false,"Killed":false,"Accumulables":[{"ID":672,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerJobEnd","Job ID":4,"Completion Time":1596020225059,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":14,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":14,"time":1596020225087}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":13,"time":1596020225087}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":12,"time":1596020225087}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:04.734Z","batchId":4,"batchDuration":387,"durationMs":{"triggerExecution":387,"queryPlanning":30,"getBatch":1,"latestOffset":3,"addBatch":306,"walCommit":12},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":15,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":16,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":5,"Submission Time":1596020225342,"Stage Infos":[{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Numbe [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerTaskStart","Stage ID":10,"Stage Attempt ID":0,"Task Info":{"Task ID":15,"Index":0,"Attempt":0,"Launch Time":1596020225359,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":15,"Index":0,"Attempt":0,"Launch Time":1596020225359,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225400,"Failed":false,"Killed":false,"Accumulables":[{"ID":806,"Name":"shuffle write time","Update":"530930","Value":"530930","Intern [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":11,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":71,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"165\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[70],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerTaskStart","Stage ID":11,"Stage Attempt ID":0,"Task Info":{"Task ID":16,"Index":0,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":11,"Stage Attempt ID":0,"Task Info":{"Task ID":17,"Index":1,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":11,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":17,"Index":1,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225498,"Failed":false,"Killed":false,"Accumulables":[{"ID":829,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":11,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":16,"Index":0,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225509,"Failed":false,"Killed":false,"Accumulables":[{"ID":829,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":11,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":71,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"165\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[70],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerJobEnd","Job ID":5,"Completion Time":1596020225514,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":17,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":17,"time":1596020225541}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":16,"time":1596020225542}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":15,"time":1596020225542}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.123Z","batchId":5,"batchDuration":437,"durationMs":{"triggerExecution":437,"queryPlanning":35,"getBatch":1,"latestOffset":3,"addBatch":361,"walCommit":18},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":18,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":19,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":6,"Submission Time":1596020225759,"Stage Infos":[{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Numbe [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerTaskStart","Stage ID":12,"Stage Attempt ID":0,"Task Info":{"Task ID":18,"Index":0,"Attempt":0,"Launch Time":1596020225766,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":12,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":18,"Index":0,"Attempt":0,"Launch Time":1596020225766,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225796,"Failed":false,"Killed":false,"Accumulables":[{"ID":963,"Name":"shuffle write time","Update":"543836","Value":"543836","Intern [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":13,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":83,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"198\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[82],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerTaskStart","Stage ID":13,"Stage Attempt ID":0,"Task Info":{"Task ID":19,"Index":0,"Attempt":0,"Launch Time":1596020225808,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":13,"Stage Attempt ID":0,"Task Info":{"Task ID":20,"Index":1,"Attempt":0,"Launch Time":1596020225809,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":13,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":19,"Index":0,"Attempt":0,"Launch Time":1596020225808,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225868,"Failed":false,"Killed":false,"Accumulables":[{"ID":986,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":13,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":20,"Index":1,"Attempt":0,"Launch Time":1596020225809,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225874,"Failed":false,"Killed":false,"Accumulables":[{"ID":986,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":13,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":83,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"198\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[82],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerJobEnd","Job ID":6,"Completion Time":1596020225875,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":20,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":20,"time":1596020225896}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":19,"time":1596020225897}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":18,"time":1596020225897}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.562Z","batchId":6,"batchDuration":351,"durationMs":{"triggerExecution":351,"queryPlanning":28,"getBatch":1,"latestOffset":6,"addBatch":273,"walCommit":25},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":21,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":22,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":7,"Submission Time":1596020226076,"Stage Infos":[{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier" [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":14,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":90,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"239\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[89],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerTaskStart","Stage ID":14,"Stage Attempt ID":0,"Task Info":{"Task ID":21,"Index":0,"Attempt":0,"Launch Time":1596020226086,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":14,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":21,"Index":0,"Attempt":0,"Launch Time":1596020226086,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226116,"Failed":false,"Killed":false,"Accumulables":[{"ID":1120,"Name":"shuffle write time","Update":"543034","Value":"543034","Inter [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":14,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":90,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"239\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[89],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerTaskStart","Stage ID":15,"Stage Attempt ID":0,"Task Info":{"Task ID":22,"Index":0,"Attempt":0,"Launch Time":1596020226128,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":15,"Stage Attempt ID":0,"Task Info":{"Task ID":23,"Index":1,"Attempt":0,"Launch Time":1596020226129,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":15,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":23,"Index":1,"Attempt":0,"Launch Time":1596020226129,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226196,"Failed":false,"Killed":false,"Accumulables":[{"ID":1143,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":15,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":22,"Index":0,"Attempt":0,"Launch Time":1596020226128,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226204,"Failed":false,"Killed":false,"Accumulables":[{"ID":1143,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerJobEnd","Job ID":7,"Completion Time":1596020226204,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":23,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":23,"time":1596020226230}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":22,"time":1596020226231}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":21,"time":1596020226231}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.916Z","batchId":7,"batchDuration":341,"durationMs":{"triggerExecution":341,"queryPlanning":24,"getBatch":0,"latestOffset":3,"addBatch":271,"walCommit":14},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"SparkListenerApplicationEnd","Timestamp":1596020226301}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala
similarity index 53%
copy from sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
copy to sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala
index bb097ff..f73305b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
+++ b/sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala
@@ -14,26 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.streaming.ui
-import org.apache.spark.internal.Logging
-import org.apache.spark.ui.{SparkUI, SparkUITab}
+package org.apache.spark.deploy.history
-private[sql] class StreamingQueryTab(
- val statusListener: StreamingQueryStatusListener,
- sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.History.HISTORY_LOG_DIR
+import org.apache.spark.util.ManualClock
- override val name = "Structured Streaming"
-
- val parent = sparkUI
-
- attachPage(new StreamingQueryPage(this))
- attachPage(new StreamingQueryStatisticsPage(this))
- parent.attachTab(this)
-
- parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
-}
-
-private[sql] object StreamingQueryTab {
- private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
+object Utils {
+ def withFsHistoryProvider(logDir: String)(fn: FsHistoryProvider => Unit): Unit = {
+ var provider: FsHistoryProvider = null
+ try {
+ val clock = new ManualClock()
+ val conf = new SparkConf().set(HISTORY_LOG_DIR, logDir)
+ val provider = new FsHistoryProvider(conf, clock)
+ provider.checkForLogs()
+ fn(provider)
+ } finally {
+ if (provider != null) {
+ provider.stop()
+ provider = null
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala
new file mode 100644
index 0000000..160535e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.Locale
+import javax.servlet.http.HttpServletRequest
+
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.deploy.history.{Utils => HsUtils}
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
+import org.apache.spark.sql.test.SharedSparkSession
+
+class StreamingQueryHistorySuite extends SharedSparkSession with BeforeAndAfter {
+
+ test("support streaming query events") {
+ val logDir = Thread.currentThread().getContextClassLoader.getResource("spark-events").toString
+ HsUtils.withFsHistoryProvider(logDir) { provider =>
+ val appUi = provider.getAppUI("local-1596020211915", None).getOrElse {
+ assert(false, "Failed to load event log of local-1596020211915.")
+ null
+ }
+ assert(appUi.ui.appName == "StructuredKafkaWordCount")
+ assert(appUi.ui.store.store.count(classOf[StreamingQueryData]) == 1)
+ assert(appUi.ui.store.store.count(classOf[StreamingQueryProgressWrapper]) == 8)
+
+ val store = new StreamingQueryStatusStore(appUi.ui.store.store)
+ val tab = new StreamingQueryTab(store, appUi.ui)
+ val request = mock(classOf[HttpServletRequest])
+ var html = new StreamingQueryPage(tab).render(request)
+ .toString().toLowerCase(Locale.ROOT)
+ // 81.39: Avg Input /sec
+ assert(html.contains("81.39"))
+ // 157.05: Avg Process /sec
+ assert(html.contains("157.05"))
+
+ val id = "8d268dc2-bc9c-4be8-97a9-b135d2943028"
+ val runId = "e225d92f-2545-48f8-87a2-9c0309580f8a"
+ when(request.getParameter("id")).thenReturn(runId)
+ html = new StreamingQueryStatisticsPage(tab).render(request)
+ .toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("<strong>8</strong> completed batches"))
+ assert(html.contains(id))
+ assert(html.contains(runId))
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
index c2b6688..246fa1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.sql.streaming.ui
import java.util.{Locale, UUID}
import javax.servlet.http.HttpServletRequest
+import scala.xml.Node
+
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter
-import scala.xml.Node
import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.ui.SparkUI
@@ -35,26 +37,26 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
val id = UUID.randomUUID()
val request = mock(classOf[HttpServletRequest])
val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
- val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+ val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
- when(tab.statusListener).thenReturn(statusListener)
+ when(tab.store).thenReturn(store)
val streamQuery = createStreamQueryUIData(id)
- when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+ when(store.allQueryUIData).thenReturn(Seq(streamQuery))
var html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("active streaming queries (1)"))
- when(streamQuery.isActive).thenReturn(false)
- when(streamQuery.exception).thenReturn(None)
+ when(streamQuery.summary.isActive).thenReturn(false)
+ when(streamQuery.summary.exception).thenReturn(None)
html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("completed streaming queries (1)"))
assert(html.contains("finished"))
- when(streamQuery.isActive).thenReturn(false)
- when(streamQuery.exception).thenReturn(Option("exception in query"))
+ when(streamQuery.summary.isActive).thenReturn(false)
+ when(streamQuery.summary.exception).thenReturn(Option("exception in query"))
html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("completed streaming queries (1)"))
@@ -66,17 +68,20 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
val id = UUID.randomUUID()
val request = mock(classOf[HttpServletRequest])
val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
- val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+ val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS)
+ when(request.getParameter("id")).thenReturn(id.toString)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+ when(tab.store).thenReturn(store)
val ui = mock(classOf[SparkUI])
when(request.getParameter("id")).thenReturn(id.toString)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
- when(tab.statusListener).thenReturn(statusListener)
when(ui.conf).thenReturn(new SparkConf())
when(tab.parent).thenReturn(ui)
val streamQuery = createStreamQueryUIData(id)
- when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+ when(store.allQueryUIData).thenReturn(Seq(streamQuery))
val html = renderStreamingQueryStatisticsPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
@@ -94,15 +99,18 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
when(progress.batchId).thenReturn(2)
when(progress.prettyJson).thenReturn("""{"a":1}""")
+ val summary = mock(classOf[StreamingQueryData], RETURNS_SMART_NULLS)
+ when(summary.isActive).thenReturn(true)
+ when(summary.name).thenReturn("query")
+ when(summary.id).thenReturn(id)
+ when(summary.runId).thenReturn(id)
+ when(summary.startTimestamp).thenReturn(1L)
+ when(summary.exception).thenReturn(None)
+
val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
- when(streamQuery.isActive).thenReturn(true)
- when(streamQuery.name).thenReturn("query")
- when(streamQuery.id).thenReturn(id)
- when(streamQuery.runId).thenReturn(id)
- when(streamQuery.startTimestamp).thenReturn(1L)
+ when(streamQuery.summary).thenReturn(summary)
when(streamQuery.lastProgress).thenReturn(progress)
when(streamQuery.recentProgress).thenReturn(Array(progress))
- when(streamQuery.exception).thenReturn(None)
streamQuery
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 6aa440e..91c55d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -17,19 +17,28 @@
package org.apache.spark.sql.streaming.ui
-import java.util.UUID
+import java.text.SimpleDateFormat
+import java.util.{Date, UUID}
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.time.SpanSugar._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
+import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
import org.apache.spark.sql.streaming
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.kvstore.InMemoryStore
class StreamingQueryStatusListenerSuite extends StreamTest {
test("onQueryStarted, onQueryProgress, onQueryTerminated") {
- val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
+ val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+ val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore)
+ val queryStore = new StreamingQueryStatusStore(kvStore)
- // hanlde query started event
+ // handle query started event
val id = UUID.randomUUID()
val runId = UUID.randomUUID()
val startEvent = new StreamingQueryListener.QueryStartedEvent(
@@ -37,8 +46,9 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryStarted(startEvent)
// result checking
- assert(listener.activeQueryStatus.size() == 1)
- assert(listener.activeQueryStatus.get(runId).name == "test")
+ assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
+ assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData =>
+ uiData.summary.runId == runId && uiData.summary.name.equals("test")))
// handle query progress event
val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
@@ -53,28 +63,32 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryProgress(processEvent)
// result checking
- val activeQuery = listener.activeQueryStatus.get(runId)
- assert(activeQuery.isActive)
- assert(activeQuery.recentProgress.length == 1)
- assert(activeQuery.lastProgress.id == id)
- assert(activeQuery.lastProgress.runId == runId)
- assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
- assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0)
- assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0)
- assert(activeQuery.lastProgress.batchId == 2)
- assert(activeQuery.lastProgress.prettyJson == """{"a":1}""")
+ val activeQuery =
+ queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == runId)
+ assert(activeQuery.isDefined)
+ assert(activeQuery.get.summary.isActive)
+ assert(activeQuery.get.recentProgress.length == 1)
+ assert(activeQuery.get.lastProgress.id == id)
+ assert(activeQuery.get.lastProgress.runId == runId)
+ assert(activeQuery.get.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
+ assert(activeQuery.get.lastProgress.inputRowsPerSecond == 10.0)
+ assert(activeQuery.get.lastProgress.processedRowsPerSecond == 12.0)
+ assert(activeQuery.get.lastProgress.batchId == 2)
+ assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""")
// handle terminate event
val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None)
listener.onQueryTerminated(terminateEvent)
- assert(!listener.inactiveQueryStatus.head.isActive)
- assert(listener.inactiveQueryStatus.head.runId == runId)
- assert(listener.inactiveQueryStatus.head.id == id)
+ assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive)
+ assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId)
+ assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id)
}
test("same query start multiple times") {
- val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
+ val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+ val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore)
+ val queryStore = new StreamingQueryStatusStore(kvStore)
// handle first time start
val id = UUID.randomUUID()
@@ -94,11 +108,106 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
listener.onQueryStarted(startEvent1)
// result checking
- assert(listener.activeQueryStatus.size() == 1)
- assert(listener.inactiveQueryStatus.length == 1)
- assert(listener.activeQueryStatus.containsKey(runId1))
- assert(listener.activeQueryStatus.get(runId1).id == id)
- assert(listener.inactiveQueryStatus.head.runId == runId0)
- assert(listener.inactiveQueryStatus.head.id == id)
+ assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
+ assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1)
+ assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId == runId1))
+ assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData =>
+ uiData.summary.runId == runId1 && uiData.summary.id == id))
+ assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId0)
+ assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id)
+ }
+
+ test("test small retained queries") {
+ val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+ val conf = spark.sparkContext.conf
+ conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2")
+ val listener = new StreamingQueryStatusListener(conf, kvStore)
+ val queryStore = new StreamingQueryStatusStore(kvStore)
+
+ def addNewQuery(): (UUID, UUID) = {
+ val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ format.setTimeZone(getTimeZone("UTC"))
+ val id = UUID.randomUUID()
+ val runId = UUID.randomUUID()
+ val startEvent = new StreamingQueryListener.QueryStartedEvent(
+ id, runId, "test1", format.format(new Date(System.currentTimeMillis())))
+ listener.onQueryStarted(startEvent)
+ (id, runId)
+ }
+
+ def checkInactiveQueryStatus(numInactives: Int, targetInactives: Seq[UUID]): Unit = {
+ eventually(timeout(10.seconds)) {
+ val inactiveQueries = queryStore.allQueryUIData.filter(!_.summary.isActive)
+ assert(inactiveQueries.size == numInactives)
+ assert(inactiveQueries.map(_.summary.id).toSet == targetInactives.toSet)
+ }
+ }
+
+ val (id1, runId1) = addNewQuery()
+ val (id2, runId2) = addNewQuery()
+ val (id3, runId3) = addNewQuery()
+ assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0)
+
+ val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None)
+ listener.onQueryTerminated(terminateEvent1)
+ checkInactiveQueryStatus(1, Seq(id1))
+ val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None)
+ listener.onQueryTerminated(terminateEvent2)
+ checkInactiveQueryStatus(2, Seq(id1, id2))
+ val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None)
+ listener.onQueryTerminated(terminateEvent3)
+ checkInactiveQueryStatus(2, Seq(id2, id3))
+ }
+
+ test("test small retained progress") {
+ val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+ val conf = spark.sparkContext.conf
+ conf.set(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES.key, "5")
+ val listener = new StreamingQueryStatusListener(conf, kvStore)
+ val queryStore = new StreamingQueryStatusStore(kvStore)
+
+ val id = UUID.randomUUID()
+ val runId = UUID.randomUUID()
+ val startEvent = new StreamingQueryListener.QueryStartedEvent(
+ id, runId, "test", "2016-12-05T20:54:20.827Z")
+ listener.onQueryStarted(startEvent)
+
+ var batchId: Int = 0
+
+ def addQueryProgress(): Unit = {
+ val progress = mockProgressData(id, runId)
+ val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress)
+ listener.onQueryProgress(processEvent)
+ }
+
+ def mockProgressData(id: UUID, runId: UUID): StreamingQueryProgress = {
+ val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ format.setTimeZone(getTimeZone("UTC"))
+
+ val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+ when(progress.id).thenReturn(id)
+ when(progress.runId).thenReturn(runId)
+ when(progress.timestamp).thenReturn(format.format(new Date(System.currentTimeMillis())))
+ when(progress.inputRowsPerSecond).thenReturn(10.0)
+ when(progress.processedRowsPerSecond).thenReturn(12.0)
+ when(progress.batchId).thenReturn(batchId)
+ when(progress.prettyJson).thenReturn("""{"a":1}""")
+
+ batchId += 1
+ progress
+ }
+
+ def checkQueryProcessData(targetNum: Int): Unit = {
+ eventually(timeout(10.seconds)) {
+ assert(queryStore.getQueryProgressData(runId).size == targetNum)
+ }
+ }
+
+ Array.tabulate(4) { _ => addQueryProgress() }
+ checkQueryProcessData(4)
+ addQueryProgress()
+ checkQueryProcessData(5)
+ addQueryProgress()
+ checkQueryProcessData(5)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org