You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/12/03 01:13:23 UTC

[spark] branch master updated: [SPARK-31953][SS] Add Spark Structured Streaming History Server Support

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f96670  [SPARK-31953][SS] Add Spark Structured Streaming History Server Support
4f96670 is described below

commit 4f9667035886a67e6c9a4e8fad2efa390e87ca68
Author: uncleGen <hu...@gmail.com>
AuthorDate: Wed Dec 2 17:11:51 2020 -0800

    [SPARK-31953][SS] Add Spark Structured Streaming History Server Support
    
    ### What changes were proposed in this pull request?
    
    Add Spark Structured Streaming History Server Support.
    
    ### Why are the changes needed?
    
    Add a streaming query history server plugin.
    
    ![image](https://user-images.githubusercontent.com/7402327/84248291-d26cfe80-ab3b-11ea-86d2-98205fa2bcc4.png)
    ![image](https://user-images.githubusercontent.com/7402327/84248347-e44ea180-ab3b-11ea-81de-eefe207656f2.png)
    ![image](https://user-images.githubusercontent.com/7402327/84248396-f0d2fa00-ab3b-11ea-9b0d-e410115471b0.png)
    
    - Follow-ups
      - Query duration should not update in history UI.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Update UT.
    
    Closes #28781 from uncleGen/SPARK-31953.
    
    Lead-authored-by: uncleGen <hu...@gmail.com>
    Co-authored-by: Genmao Yu <hu...@gmail.com>
    Co-authored-by: Yuanjian Li <yu...@databricks.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 dev/.rat-excludes                                  |   1 +
 .../org.apache.spark.status.AppHistoryServerPlugin |   1 +
 .../streaming/StreamingQueryListenerBus.scala      |  26 +++-
 .../ui/StreamingQueryHistoryServerPlugin.scala     |  43 ++++++
 .../execution/ui/StreamingQueryStatusStore.scala   |  53 +++++++
 .../apache/spark/sql/internal/SharedState.scala    |   8 +-
 .../sql/streaming/StreamingQueryManager.scala      |   3 +-
 .../sql/streaming/ui/StreamingQueryPage.scala      |  44 +++---
 .../ui/StreamingQueryStatisticsPage.scala          |  27 ++--
 .../ui/StreamingQueryStatusListener.scala          | 166 +++++++++++++--------
 .../spark/sql/streaming/ui/StreamingQueryTab.scala |   3 +-
 .../apache/spark/sql/streaming/ui/UIUtils.scala    |  12 +-
 .../resources/spark-events/local-1596020211915     | 160 ++++++++++++++++++++
 .../org/apache/spark/deploy/history/Utils.scala}   |  39 ++---
 .../streaming/ui/StreamingQueryHistorySuite.scala  |  63 ++++++++
 .../sql/streaming/ui/StreamingQueryPageSuite.scala |  42 +++---
 .../ui/StreamingQueryStatusListenerSuite.scala     | 159 ++++++++++++++++----
 17 files changed, 673 insertions(+), 177 deletions(-)

diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 7da330d..167cf22 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -123,6 +123,7 @@ SessionHandler.java
 GangliaReporter.java
 application_1578436911597_0052
 config.properties
+local-1596020211915
 app-20200706201101-0003
 py.typed
 _metadata
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
index 0bba2f8..6771eef 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -1 +1,2 @@
 org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin
+org.apache.spark.sql.execution.ui.StreamingQueryHistoryServerPlugin
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 1b8d69f..4b98acd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -31,16 +31,21 @@ import org.apache.spark.util.ListenerBus
  * Spark listener bus, so that it can receive [[StreamingQueryListener.Event]]s and dispatch them
  * to StreamingQueryListeners.
  *
- * Note that each bus and its registered listeners are associated with a single SparkSession
+ * Note 1: Each bus and its registered listeners are associated with a single SparkSession
  * and StreamingQueryManager. So this bus will dispatch events to registered listeners for only
  * those queries that were started in the associated SparkSession.
+ *
+ * Note 2: To rebuild Structured Streaming UI in SHS, this bus will be registered into
+ * [[org.apache.spark.scheduler.ReplayListenerBus]]. We check `sparkListenerBus` defined or not to
+ * determine how to process [[StreamingQueryListener.Event]]. If false, it means this bus is used to
+ * replay all streaming query event from eventLog.
  */
-class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
   extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {
 
   import StreamingQueryListener._
 
-  sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY)
+  sparkListenerBus.foreach(_.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY))
 
   /**
    * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus
@@ -67,11 +72,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
     event match {
       case s: QueryStartedEvent =>
         activeQueryRunIds.synchronized { activeQueryRunIds += s.runId }
-        sparkListenerBus.post(s)
+        sparkListenerBus.foreach(bus => bus.post(s))
         // post to local listeners to trigger callbacks
         postToAll(s)
       case _ =>
-        sparkListenerBus.post(event)
+        sparkListenerBus.foreach(bus => bus.post(event))
     }
   }
 
@@ -95,7 +100,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
         // synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
         // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
         // thread
-        if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
+        //
+        // When loaded by Spark History Server, we should process all event coming from replay
+        // listener bus.
+        if (sparkListenerBus.isEmpty || !LiveListenerBus.withinListenerThread.value ||
+            !e.isInstanceOf[QueryStartedEvent])  {
           postToAll(e)
         }
       case _ =>
@@ -110,7 +119,10 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
       listener: StreamingQueryListener,
       event: StreamingQueryListener.Event): Unit = {
     def shouldReport(runId: UUID): Boolean = {
-      activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
+      // When loaded by Spark History Server, we should process all event coming from replay
+      // listener bus.
+      sparkListenerBus.isEmpty ||
+        activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
     }
 
     event match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
new file mode 100644
index 0000000..a127fa5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.sql.execution.streaming.StreamingQueryListenerBus
+import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
+import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
+import org.apache.spark.ui.SparkUI
+
+class StreamingQueryHistoryServerPlugin extends AppHistoryServerPlugin {
+
+  override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = {
+    val listenerBus = new StreamingQueryListenerBus(None)
+    listenerBus.addListener(new StreamingQueryStatusListener(conf, store))
+    Seq(listenerBus)
+  }
+
+  override def setupUI(ui: SparkUI): Unit = {
+    val streamingQueryStatusStore = new StreamingQueryStatusStore(ui.store.store)
+    if (streamingQueryStatusStore.allQueryUIData.nonEmpty) {
+      new StreamingQueryTab(streamingQueryStatusStore, ui)
+    }
+  }
+
+  override def displayOrder: Int = 1
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
new file mode 100644
index 0000000..9eb14a6a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryStatusStore.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.UUID
+
+import org.apache.spark.sql.streaming.ui.{StreamingQueryData, StreamingQueryProgressWrapper, StreamingQueryUIData}
+import org.apache.spark.status.KVUtils
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query Streaming Query state.
+ * There's no state kept in this class, so it's ok to have multiple instances of it in an
+ * application.
+ */
+class StreamingQueryStatusStore(store: KVStore) {
+
+  def allQueryUIData: Seq[StreamingQueryUIData] = {
+    val view = store.view(classOf[StreamingQueryData]).index("startTimestamp").first(0L)
+    KVUtils.viewToSeq(view, Int.MaxValue)(_ => true).map(makeUIData)
+  }
+
+  // visible for test
+  private[sql] def getQueryProgressData(runId: UUID): Seq[StreamingQueryProgressWrapper] = {
+    val view = store.view(classOf[StreamingQueryProgressWrapper])
+      .index("runId").first(runId.toString).last(runId.toString)
+    KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+  }
+
+  private def makeUIData(summary: StreamingQueryData): StreamingQueryUIData = {
+    val runId = summary.runId.toString
+    val view = store.view(classOf[StreamingQueryProgressWrapper])
+      .index("runId").first(runId).last(runId)
+    val recentProgress = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+      .map(_.progress).sortBy(_.timestamp).toArray
+    StreamingQueryUIData(summary, recentProgress)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 89aceac..ea430db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.streaming.StreamExecution
-import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
+import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab, StreamingQueryStatusStore}
 import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
 import org.apache.spark.status.ElementTrackingStore
@@ -111,9 +111,9 @@ private[sql] class SharedState(
   lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
     sparkContext.ui.flatMap { ui =>
       if (conf.get(STREAMING_UI_ENABLED)) {
-        val statusListener = new StreamingQueryStatusListener(conf)
-        new StreamingQueryTab(statusListener, ui)
-        Some(statusListener)
+        val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
+        new StreamingQueryTab(new StreamingQueryStatusStore(kvStore), ui)
+        Some(new StreamingQueryStatusListener(conf, kvStore))
       } else {
         None
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index ffdbe9d..b66037d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -49,7 +49,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
 
   private[sql] val stateStoreCoordinator =
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
-  private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
+  private val listenerBus =
+    new StreamingQueryListenerBus(Some(sparkSession.sparkContext.listenerBus))
 
   @GuardedBy("activeQueriesSharedLock")
   private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
index b98fdf1..96e4989 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
@@ -40,8 +40,8 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
   }
 
   private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
-    val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
-      .partition(_.isActive)
+    val (activeQueries, inactiveQueries) =
+      parent.store.allQueryUIData.partition(_.summary.isActive)
 
     val content = mutable.ListBuffer[Node]()
     // show active queries table only if there is at least one active query
@@ -176,7 +176,7 @@ class StreamingQueryPagedTable(
     val streamingQuery = query.streamingUIData
     val statisticsLink = "%s/%s/statistics?id=%s"
       .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix,
-        streamingQuery.runId)
+        streamingQuery.summary.runId)
 
     def details(detail: Any): Seq[Node] = {
       if (isActive) {
@@ -194,14 +194,14 @@ class StreamingQueryPagedTable(
     <tr>
       <td>{UIUtils.getQueryName(streamingQuery)}</td>
       <td>{UIUtils.getQueryStatus(streamingQuery)}</td>
-      <td>{streamingQuery.id}</td>
-      <td><a href={statisticsLink}>{streamingQuery.runId}</a></td>
-      <td>{SparkUIUtils.formatDate(streamingQuery.startTimestamp)}</td>
+      <td>{streamingQuery.summary.id}</td>
+      <td><a href={statisticsLink}>{streamingQuery.summary.runId}</a></td>
+      <td>{SparkUIUtils.formatDate(streamingQuery.summary.startTimestamp)}</td>
       <td>{SparkUIUtils.formatDurationVerbose(query.duration)}</td>
       <td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td>
       <td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td>
       <td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td>
-      {details(streamingQuery.exception.getOrElse("-"))}
+      {details(streamingQuery.summary.exception.getOrElse("-"))}
     </tr>
   }
 }
@@ -222,32 +222,32 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St
 
   override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to)
 
-  private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = {
+  private def streamingRow(uiData: StreamingQueryUIData): StructuredStreamingRow = {
     val duration = if (isActive) {
-      System.currentTimeMillis() - query.startTimestamp
+      System.currentTimeMillis() - uiData.summary.startTimestamp
     } else {
-      withNoProgress(query, {
-        val endTimeMs = query.lastProgress.timestamp
-        parseProgressTimestamp(endTimeMs) - query.startTimestamp
+      withNoProgress(uiData, {
+        val endTimeMs = uiData.lastProgress.timestamp
+        parseProgressTimestamp(endTimeMs) - uiData.summary.startTimestamp
       }, 0)
     }
 
-    val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
-      query.recentProgress.length)
+    val avgInput = (uiData.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
+      uiData.recentProgress.length)
 
-    val avgProcess = (query.recentProgress.map(p =>
-      withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length)
+    val avgProcess = (uiData.recentProgress.map(p =>
+      withNumberInvalid(p.processedRowsPerSecond)).sum / uiData.recentProgress.length)
 
-    StructuredStreamingRow(duration, avgInput, avgProcess, query)
+    StructuredStreamingRow(duration, avgInput, avgProcess, uiData)
   }
 
   private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = {
     val ordering: Ordering[StructuredStreamingRow] = sortColumn match {
-      case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData))
-      case "Status" => Ordering.by(q => UIUtils.getQueryStatus(q.streamingUIData))
-      case "ID" => Ordering.by(_.streamingUIData.id)
-      case "Run ID" => Ordering.by(_.streamingUIData.runId)
-      case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp)
+      case "Name" => Ordering.by(row => UIUtils.getQueryName(row.streamingUIData))
+      case "Status" => Ordering.by(row => UIUtils.getQueryStatus(row.streamingUIData))
+      case "ID" => Ordering.by(_.streamingUIData.summary.id)
+      case "Run ID" => Ordering.by(_.streamingUIData.summary.runId)
+      case "Start Time" => Ordering.by(_.streamingUIData.summary.startTimestamp)
       case "Duration" => Ordering.by(_.duration)
       case "Avg Input /sec" => Ordering.by(_.avgInput)
       case "Avg Process /sec" => Ordering.by(_.avgProcess)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
index 24709ba..97691d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
@@ -58,8 +58,8 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
     val parameterId = request.getParameter("id")
     require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
 
-    val query = parent.statusListener.allQueryStatus.find { case q =>
-      q.runId.equals(UUID.fromString(parameterId))
+    val query = parent.store.allQueryUIData.find { uiData =>
+      uiData.summary.runId.equals(UUID.fromString(parameterId))
     }.getOrElse(throw new IllegalArgumentException(s"Failed to find streaming query $parameterId"))
 
     val resources = generateLoadResources(request)
@@ -109,34 +109,35 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
     <script>{Unparsed(js)}</script>
   }
 
-  def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
-    val duration = if (query.isActive) {
-      SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp)
+  def generateBasicInfo(uiData: StreamingQueryUIData): Seq[Node] = {
+    val duration = if (uiData.summary.isActive) {
+      val durationMs = System.currentTimeMillis() - uiData.summary.startTimestamp
+      SparkUIUtils.formatDurationVerbose(durationMs)
     } else {
-      withNoProgress(query, {
-        val end = query.lastProgress.timestamp
-        val start = query.recentProgress.head.timestamp
+      withNoProgress(uiData, {
+        val end = uiData.lastProgress.timestamp
+        val start = uiData.recentProgress.head.timestamp
         SparkUIUtils.formatDurationVerbose(
           parseProgressTimestamp(end) - parseProgressTimestamp(start))
       }, "-")
     }
 
-    val name = UIUtils.getQueryName(query)
-    val numBatches = withNoProgress(query, { query.lastProgress.batchId + 1L }, 0)
+    val name = UIUtils.getQueryName(uiData)
+    val numBatches = withNoProgress(uiData, { uiData.lastProgress.batchId + 1L }, 0)
     <div>Running batches for
       <strong>
         {duration}
       </strong>
       since
       <strong>
-        {SparkUIUtils.formatDate(query.startTimestamp)}
+        {SparkUIUtils.formatDate(uiData.summary.startTimestamp)}
       </strong>
       (<strong>{numBatches}</strong> completed batches)
     </div>
     <br />
     <div><strong>Name: </strong>{name}</div>
-    <div><strong>Id: </strong>{query.id}</div>
-    <div><strong>RunId: </strong>{query.runId}</div>
+    <div><strong>Id: </strong>{uiData.summary.id}</div>
+    <div><strong>RunId: </strong>{uiData.summary.runId}</div>
     <br />
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index e331083..fdd3754 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -20,102 +20,144 @@ package org.apache.spark.sql.streaming.ui
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.fasterxml.jackson.annotation.JsonIgnore
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._
 import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.KVIndex
 
 /**
  * A customized StreamingQueryListener used in structured streaming UI, which contains all
  * UI data for both active and inactive query.
- * TODO: Add support for history server.
  */
-private[sql] class StreamingQueryStatusListener(conf: SparkConf) extends StreamingQueryListener {
-
-  /**
-   * We use runId as the key here instead of id in active query status map,
-   * because the runId is unique for every started query, even it its a restart.
-   */
-  private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
-  private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
+private[sql] class StreamingQueryStatusListener(
+    conf: SparkConf,
+    store: ElementTrackingStore) extends StreamingQueryListener {
 
   private val streamingProgressRetention =
     conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)
   private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES)
 
+  store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) { count =>
+    cleanupInactiveQueries(count)
+  }
+
+  // Events from the same query run will never be processed concurrently, so it's safe to
+  // access `progressIds` without any protection.
+  private val queryToProgress = new ConcurrentHashMap[UUID, mutable.Queue[String]]()
+
+  private def cleanupInactiveQueries(count: Long): Unit = {
+    val view = store.view(classOf[StreamingQueryData]).index("active").first(false).last(false)
+    val inactiveQueries = KVUtils.viewToSeq(view, Int.MaxValue)(_ => true)
+    val numInactiveQueries = inactiveQueries.size
+    if (numInactiveQueries <= inactiveQueryStatusRetention) {
+      return
+    }
+    val toDelete = inactiveQueries.sortBy(_.endTimestamp.get)
+      .take(numInactiveQueries - inactiveQueryStatusRetention)
+    val runIds = toDelete.map { e =>
+      store.delete(e.getClass, e.runId)
+      e.runId.toString
+    }
+    // Delete wrappers in one pass, as deleting them for each summary is slow
+    store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], "runId", runIds)
+  }
+
   override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
     val startTimestamp = parseProgressTimestamp(event.timestamp)
-    activeQueryStatus.putIfAbsent(event.runId,
-      new StreamingQueryUIData(event.name, event.id, event.runId, startTimestamp))
+    store.write(new StreamingQueryData(
+      event.name,
+      event.id,
+      event.runId,
+      isActive = true,
+      None,
+      startTimestamp
+    ), checkTriggers = true)
   }
 
   override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
-    val batchTimestamp = parseProgressTimestamp(event.progress.timestamp)
-    val queryStatus = activeQueryStatus.getOrDefault(
-      event.progress.runId,
-      new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId,
-        batchTimestamp))
-    queryStatus.updateProcess(event.progress, streamingProgressRetention)
-  }
-
-  override def onQueryTerminated(
-      event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
-    val queryStatus = activeQueryStatus.remove(event.runId)
-    if (queryStatus != null) {
-      queryStatus.queryTerminated(event)
-      inactiveQueryStatus += queryStatus
-      while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
-        inactiveQueryStatus.dequeue()
-      }
+    val runId = event.progress.runId
+    val batchId = event.progress.batchId
+    val timestamp = event.progress.timestamp
+    if (!queryToProgress.containsKey(runId)) {
+      queryToProgress.put(runId, mutable.Queue.empty[String])
+    }
+    val progressIds = queryToProgress.get(runId)
+    progressIds.enqueue(getUniqueId(runId, batchId, timestamp))
+    store.write(new StreamingQueryProgressWrapper(event.progress))
+    while (progressIds.length > streamingProgressRetention) {
+      val uniqueId = progressIds.dequeue
+      store.delete(classOf[StreamingQueryProgressWrapper], uniqueId)
     }
   }
 
-  def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
-    activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
+  override def onQueryTerminated(
+      event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
+    val querySummary = store.read(classOf[StreamingQueryData], event.runId)
+    val curTime = System.currentTimeMillis()
+    store.write(new StreamingQueryData(
+      querySummary.name,
+      querySummary.id,
+      querySummary.runId,
+      isActive = false,
+      querySummary.exception,
+      querySummary.startTimestamp,
+      Some(curTime)
+    ), checkTriggers = true)
+    queryToProgress.remove(event.runId)
   }
 }
 
+private[sql] class StreamingQueryData(
+    val name: String,
+    val id: UUID,
+    @KVIndexParam val runId: UUID,
+    @KVIndexParam("active") val isActive: Boolean,
+    val exception: Option[String],
+    @KVIndexParam("startTimestamp") val startTimestamp: Long,
+    val endTimestamp: Option[Long] = None)
+
 /**
  * This class contains all message related to UI display, each instance corresponds to a single
  * [[org.apache.spark.sql.streaming.StreamingQuery]].
  */
-private[ui] class StreamingQueryUIData(
-    val name: String,
-    val id: UUID,
-    val runId: UUID,
-    val startTimestamp: Long) {
-
-  /** Holds the most recent query progress updates. */
-  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
-
-  private var _isActive = true
-  private var _exception: Option[String] = None
-
-  def isActive: Boolean = synchronized { _isActive }
-
-  def exception: Option[String] = synchronized { _exception }
-
-  def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
-    _isActive = false
-    _exception = event.exception
-  }
-
-  def updateProcess(
-      newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized {
-    progressBuffer += newProgress
-    while (progressBuffer.length >= retentionNum) {
-      progressBuffer.dequeue()
+private[sql] case class StreamingQueryUIData(
+    summary: StreamingQueryData,
+    recentProgress: Array[StreamingQueryProgress]) {
+
+  def lastProgress: StreamingQueryProgress = {
+    if (recentProgress.nonEmpty) {
+      recentProgress.last
+    } else {
+      null
     }
   }
+}
 
-  def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
-    progressBuffer.toArray
-  }
+private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) {
+  @JsonIgnore @KVIndex
+  private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp)
 
-  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
-    progressBuffer.lastOption.orNull
+  @JsonIgnore @KVIndex("runId")
+  private def runIdIndex: String = progress.runId.toString
+}
+
+private[sql] object StreamingQueryProgressWrapper {
+  /**
+   * Adding `timestamp` into unique id to support reporting `empty` query progress
+   * in which no data comes but with the same batchId.
+   */
+  def getUniqueId(
+      runId: UUID,
+      batchId: Long,
+      timestamp: String): String = {
+    s"${runId}_${batchId}_$timestamp"
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
index bb097ff..65cad8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
@@ -17,10 +17,11 @@
 package org.apache.spark.sql.streaming.ui
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 private[sql] class StreamingQueryTab(
-    val statusListener: StreamingQueryStatusListener,
+    val store: StreamingQueryStatusStore,
     sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
 
   override val name = "Structured Streaming"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
index 1f7e65d..88a110f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
@@ -46,19 +46,19 @@ private[ui] object UIUtils {
     }
   }
 
-  def getQueryName(query: StreamingQueryUIData): String = {
-    if (query.name == null || query.name.isEmpty) {
+  def getQueryName(uiData: StreamingQueryUIData): String = {
+    if (uiData.summary.name == null || uiData.summary.name.isEmpty) {
       "<no name>"
     } else {
-      query.name
+      uiData.summary.name
     }
   }
 
-  def getQueryStatus(query: StreamingQueryUIData): String = {
-    if (query.isActive) {
+  def getQueryStatus(uiData: StreamingQueryUIData): String = {
+    if (uiData.summary.isActive) {
       "RUNNING"
     } else {
-      query.exception.map(_ => "FAILED").getOrElse("FINISHED")
+      uiData.summary.exception.map(_ => "FAILED").getOrElse("FINISHED")
     }
   }
 
diff --git a/sql/core/src/test/resources/spark-events/local-1596020211915 b/sql/core/src/test/resources/spark-events/local-1596020211915
new file mode 100644
index 0000000..ff34bbc
--- /dev/null
+++ b/sql/core/src/test/resources/spark-events/local-1596020211915
@@ -0,0 +1,160 @@
+{"Event":"SparkListenerLogStart","Spark Version":"3.1.0-SNAPSHOT"}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":0,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":1,"Discovery Script":"","Vendor":""},"memory":{"Resource Name":"memory","Amount":1024,"Discovery Script":"","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0}}}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1596020212090,"Executor ID":"driver","Executor Info":{"Host":"iZbp19vpr16ix621sdw476Z","Total Cores":4,"Log Urls":{},"Attributes":{},"Resources":{},"Resource Profile Id":0}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Port":39845},"Maximum Memory":384093388,"Timestamp":1596020212109,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre","Java Version":"1.8.0_252 (Oracle Corporation)","Scala Version":"version 2.12.10"},"Spark Properties":{"spark.driver.host":"iZbp19vpr16ix621sdw476Z","spark.eventLog.enabled":"true","spark.driver.port":"46309","spark.jars":"file:/root/spark-3.1.0-SNAPSHOT-bin-hadoop2.8/./examples/jars/spark-examples_2.12-3.1.0-SNAPSHOT.jar","spark.app.name":"Structure [...]
+{"Event":"SparkListenerApplicationStart","App Name":"StructuredKafkaWordCount","App ID":"local-1596020211915","Timestamp":1596020210919,"User":"root"}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent","id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:56:55.947Z"}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":0,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":1,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1596020221633,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of  [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Partiti [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1596020221738,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1596020221738,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222649,"Failed":false,"Killed":false,"Accumulables":[{"ID":21,"Name":"shuffle write time","Update":"9599308","Value":"9599308","Interna [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Partiti [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number of [...]
+{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1596020222709,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":0,"Attempt":0,"Launch Time":1596020222713,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":0,"Attempt":0,"Launch Time":1596020222713,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222954,"Failed":false,"Killed":false,"Accumulables":[{"ID":44,"Name":"duration","Update":"19","Value":"19","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1596020222709,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020222965,"Failed":false,"Killed":false,"Accumulables":[{"ID":44,"Name":"duration","Update":"33","Value":"52","Internal":true,"Count Failed Value [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number of [...]
+{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1596020222973,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":2,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 0","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":2,"time":1596020223062}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":1,"time":1596020223069}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":0,"time":1596020223069}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:56:56.015Z","batchId":0,"batchDuration":7110,"durationMs":{"triggerExecution":7109,"queryPlanning":439,"getBatch":21,"latestOffset":3524,"addBatch":3011,"walCommit":35},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":776,"numL [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":3,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":4,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"SparkListenerJobStart","Job ID":1,"Submission Time":1596020223482,"Stage Infos":[{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number  [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":0,"Attempt":0,"Launch Time":1596020223493,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":2,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":0,"Attempt":0,"Launch Time":1596020223493,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223601,"Failed":false,"Killed":false,"Accumulables":[{"ID":178,"Name":"shuffle write time","Update":"837580","Value":"837580","Internal [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":18,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"41\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":3,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerTaskStart","Stage ID":3,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1596020223625,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":3,"Stage Attempt ID":0,"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1596020223626,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":3,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1596020223625,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223717,"Failed":false,"Killed":false,"Accumulables":[{"ID":201,"Name":"duration","Update":"4","Value":"4","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":3,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1596020223626,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020223720,"Failed":false,"Killed":false,"Accumulables":[{"ID":201,"Name":"duration","Update":"4","Value":"8","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":3,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerJobEnd","Job ID":1,"Completion Time":1596020223725,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":5,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 1","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":5,"time":1596020223761}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":4,"time":1596020223762}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":3,"time":1596020223762}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:03.168Z","batchId":1,"batchDuration":622,"durationMs":{"triggerExecution":622,"queryPlanning":47,"getBatch":0,"latestOffset":7,"addBatch":478,"walCommit":59},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1216,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":6,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":7,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"SparkListenerJobStart","Job ID":2,"Submission Time":1596020224100,"Stage Infos":[{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":f [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":4,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":30,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"74\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[29],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerTaskStart","Stage ID":4,"Stage Attempt ID":0,"Task Info":{"Task ID":6,"Index":0,"Attempt":0,"Launch Time":1596020224113,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":4,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":6,"Index":0,"Attempt":0,"Launch Time":1596020224113,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224174,"Failed":false,"Killed":false,"Accumulables":[{"ID":335,"Name":"shuffle write time","Update":"686296","Value":"686296","Internal [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":4,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":30,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"74\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[29],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Part [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerTaskStart","Stage ID":5,"Stage Attempt ID":0,"Task Info":{"Task ID":7,"Index":0,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":5,"Stage Attempt ID":0,"Task Info":{"Task ID":8,"Index":1,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":5,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":7,"Index":0,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224256,"Failed":false,"Killed":false,"Accumulables":[{"ID":358,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":5,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":8,"Index":1,"Attempt":0,"Launch Time":1596020224187,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224257,"Failed":false,"Killed":false,"Accumulables":[{"ID":358,"Name":"duration","Update":"4","Value":"7","Internal":true,"Count Failed Val [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":5,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":35,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"66\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[34],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerJobEnd","Job ID":2,"Completion Time":1596020224259,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":8,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 2","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":8,"time":1596020224287}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":7,"time":1596020224287}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":6,"time":1596020224288}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:03.793Z","batchId":2,"batchDuration":522,"durationMs":{"triggerExecution":522,"queryPlanning":41,"getBatch":1,"latestOffset":3,"addBatch":421,"walCommit":27},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":9,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKa [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":10,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":3,"Submission Time":1596020224533,"Stage Infos":[{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerTaskStart","Stage ID":6,"Stage Attempt ID":0,"Task Info":{"Task ID":9,"Index":0,"Attempt":0,"Launch Time":1596020224541,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":6,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":9,"Index":0,"Attempt":0,"Launch Time":1596020224541,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224581,"Failed":false,"Killed":false,"Accumulables":[{"ID":492,"Name":"shuffle write time","Update":"643278","Value":"643278","Internal [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":6,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":42,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"107\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[41],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":7,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":47,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"99\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[46],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerTaskStart","Stage ID":7,"Stage Attempt ID":0,"Task Info":{"Task ID":10,"Index":0,"Attempt":0,"Launch Time":1596020224596,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":7,"Stage Attempt ID":0,"Task Info":{"Task ID":11,"Index":1,"Attempt":0,"Launch Time":1596020224597,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":7,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":10,"Index":0,"Attempt":0,"Launch Time":1596020224596,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224670,"Failed":false,"Killed":false,"Accumulables":[{"ID":515,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":7,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":11,"Index":1,"Attempt":0,"Launch Time":1596020224597,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224687,"Failed":false,"Killed":false,"Accumulables":[{"ID":515,"Name":"duration","Update":"4","Value":"7","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":7,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":47,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"99\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[46],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number o [...]
+{"Event":"SparkListenerJobEnd","Job ID":3,"Completion Time":1596020224689,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":11,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 3","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":11,"time":1596020224713}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":10,"time":1596020224714}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":9,"time":1596020224714}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:04.317Z","batchId":3,"batchDuration":415,"durationMs":{"triggerExecution":415,"queryPlanning":38,"getBatch":1,"latestOffset":3,"addBatch":332,"walCommit":21},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":12,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":13,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":4,"Submission Time":1596020224928,"Stage Infos":[{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier": [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":8,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":54,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"140\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[53],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerTaskStart","Stage ID":8,"Stage Attempt ID":0,"Task Info":{"Task ID":12,"Index":0,"Attempt":0,"Launch Time":1596020224941,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":8,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":12,"Index":0,"Attempt":0,"Launch Time":1596020224941,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020224979,"Failed":false,"Killed":false,"Accumulables":[{"ID":649,"Name":"shuffle write time","Update":"572754","Value":"572754","Interna [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":8,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":54,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"140\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[53],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Par [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number  [...]
+{"Event":"SparkListenerTaskStart","Stage ID":9,"Stage Attempt ID":0,"Task Info":{"Task ID":13,"Index":0,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":9,"Stage Attempt ID":0,"Task Info":{"Task ID":14,"Index":1,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":9,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":14,"Index":1,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225056,"Failed":false,"Killed":false,"Accumulables":[{"ID":672,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":9,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":13,"Index":0,"Attempt":0,"Launch Time":1596020224994,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225058,"Failed":false,"Killed":false,"Accumulables":[{"ID":672,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed Va [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":9,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":59,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"132\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[58],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number  [...]
+{"Event":"SparkListenerJobEnd","Job ID":4,"Completion Time":1596020225059,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":14,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 4","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":14,"time":1596020225087}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":13,"time":1596020225087}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":12,"time":1596020225087}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:04.734Z","batchId":4,"batchDuration":387,"durationMs":{"triggerExecution":387,"queryPlanning":30,"getBatch":1,"latestOffset":3,"addBatch":306,"walCommit":12},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":15,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":16,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":5,"Submission Time":1596020225342,"Stage Infos":[{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Numbe [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerTaskStart","Stage ID":10,"Stage Attempt ID":0,"Task Info":{"Task ID":15,"Index":0,"Attempt":0,"Launch Time":1596020225359,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":15,"Index":0,"Attempt":0,"Launch Time":1596020225359,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225400,"Failed":false,"Killed":false,"Accumulables":[{"ID":806,"Name":"shuffle write time","Update":"530930","Value":"530930","Intern [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":10,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":66,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"173\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[65],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":11,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":71,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"165\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[70],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerTaskStart","Stage ID":11,"Stage Attempt ID":0,"Task Info":{"Task ID":16,"Index":0,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":11,"Stage Attempt ID":0,"Task Info":{"Task ID":17,"Index":1,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":11,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":17,"Index":1,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225498,"Failed":false,"Killed":false,"Accumulables":[{"ID":829,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":11,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":16,"Index":0,"Attempt":0,"Launch Time":1596020225417,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225509,"Failed":false,"Killed":false,"Accumulables":[{"ID":829,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":11,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":71,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"165\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[70],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerJobEnd","Job ID":5,"Completion Time":1596020225514,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":17,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 5","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":17,"time":1596020225541}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":16,"time":1596020225542}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":15,"time":1596020225542}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.123Z","batchId":5,"batchDuration":437,"durationMs":{"triggerExecution":437,"queryPlanning":35,"getBatch":1,"latestOffset":3,"addBatch":361,"walCommit":18},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":18,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":19,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":6,"Submission Time":1596020225759,"Stage Infos":[{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Numbe [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerTaskStart","Stage ID":12,"Stage Attempt ID":0,"Task Info":{"Task ID":18,"Index":0,"Attempt":0,"Launch Time":1596020225766,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":12,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":18,"Index":0,"Attempt":0,"Launch Time":1596020225766,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225796,"Failed":false,"Killed":false,"Accumulables":[{"ID":963,"Name":"shuffle write time","Update":"543836","Value":"543836","Intern [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":12,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":78,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"206\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[77],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":13,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":83,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"198\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[82],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerTaskStart","Stage ID":13,"Stage Attempt ID":0,"Task Info":{"Task ID":19,"Index":0,"Attempt":0,"Launch Time":1596020225808,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":13,"Stage Attempt ID":0,"Task Info":{"Task ID":20,"Index":1,"Attempt":0,"Launch Time":1596020225809,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":13,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":19,"Index":0,"Attempt":0,"Launch Time":1596020225808,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225868,"Failed":false,"Killed":false,"Accumulables":[{"ID":986,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":13,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":20,"Index":1,"Attempt":0,"Launch Time":1596020225809,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020225874,"Failed":false,"Killed":false,"Accumulables":[{"ID":986,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed V [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":13,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":83,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"198\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[82],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerJobEnd","Job ID":6,"Completion Time":1596020225875,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":20,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 6","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":20,"time":1596020225896}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":19,"time":1596020225897}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":18,"time":1596020225897}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.562Z","batchId":6,"batchDuration":351,"durationMs":{"triggerExecution":351,"queryPlanning":28,"getBatch":1,"latestOffset":6,"addBatch":273,"walCommit":25},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":21,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":22,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"SparkListenerJobStart","Job ID":7,"Submission Time":1596020226076,"Stage Infos":[{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier" [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":14,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":90,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"239\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[89],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerTaskStart","Stage ID":14,"Stage Attempt ID":0,"Task Info":{"Task ID":21,"Index":0,"Attempt":0,"Launch Time":1596020226086,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":14,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":21,"Index":0,"Attempt":0,"Launch Time":1596020226086,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226116,"Failed":false,"Killed":false,"Accumulables":[{"ID":1120,"Name":"shuffle write time","Update":"543034","Value":"543034","Inter [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":14,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":1,"RDD Info":[{"RDD ID":90,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"239\",\"name\":\"Exchange\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[89],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":1,"Number of Cached Pa [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerTaskStart","Stage ID":15,"Stage Attempt ID":0,"Task Info":{"Task ID":22,"Index":0,"Attempt":0,"Launch Time":1596020226128,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":15,"Stage Attempt ID":0,"Task Info":{"Task ID":23,"Index":1,"Attempt":0,"Launch Time":1596020226129,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":15,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":23,"Index":1,"Attempt":0,"Launch Time":1596020226129,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226196,"Failed":false,"Killed":false,"Accumulables":[{"ID":1143,"Name":"duration","Update":"3","Value":"3","Internal":true,"Count Failed  [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":15,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":22,"Index":0,"Attempt":0,"Launch Time":1596020226128,"Executor ID":"driver","Host":"iZbp19vpr16ix621sdw476Z","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1596020226204,"Failed":false,"Killed":false,"Accumulables":[{"ID":1143,"Name":"duration","Update":"2","Value":"5","Internal":true,"Count Failed  [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":15,"Stage Attempt ID":0,"Stage Name":"start at StructuredKafkaWordCount.scala:86","Number of Tasks":2,"RDD Info":[{"RDD ID":95,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"231\",\"name\":\"WholeStageCodegen (4)\"}","Callsite":"start at StructuredKafkaWordCount.scala:86","Parent IDs":[94],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":2,"Number [...]
+{"Event":"SparkListenerJobEnd","Job ID":7,"Completion Time":1596020226204,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":23,"description":"\nid = 8d268dc2-bc9c-4be8-97a9-b135d2943028\nrunId = e225d92f-2545-48f8-87a2-9c0309580f8a\nbatch = 7","details":"org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:366)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:86)\norg.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredK [...]
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":23,"time":1596020226230}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":22,"time":1596020226231}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":21,"time":1596020226231}
+{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent","progress":{"id":"8d268dc2-bc9c-4be8-97a9-b135d2943028","runId":"e225d92f-2545-48f8-87a2-9c0309580f8a","name":null,"timestamp":"2020-07-29T10:57:05.916Z","batchId":7,"batchDuration":341,"durationMs":{"triggerExecution":341,"queryPlanning":24,"getBatch":0,"latestOffset":3,"addBatch":271,"walCommit":14},"eventTime":{},"stateOperators":[{"numRowsTotal":1,"numRowsUpdated":1,"memoryUsedBytes":1184,"numLateInpu [...]
+{"Event":"SparkListenerApplicationEnd","Timestamp":1596020226301}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala
similarity index 53%
copy from sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
copy to sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala
index bb097ff..f73305b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
+++ b/sql/core/src/test/scala/org/apache/spark/deploy/history/Utils.scala
@@ -14,26 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.streaming.ui
 
-import org.apache.spark.internal.Logging
-import org.apache.spark.ui.{SparkUI, SparkUITab}
+package org.apache.spark.deploy.history
 
-private[sql] class StreamingQueryTab(
-    val statusListener: StreamingQueryStatusListener,
-    sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.History.HISTORY_LOG_DIR
+import org.apache.spark.util.ManualClock
 
-  override val name = "Structured Streaming"
-
-  val parent = sparkUI
-
-  attachPage(new StreamingQueryPage(this))
-  attachPage(new StreamingQueryStatisticsPage(this))
-  parent.attachTab(this)
-
-  parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
-}
-
-private[sql] object StreamingQueryTab {
-  private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
+object Utils {
+  def withFsHistoryProvider(logDir: String)(fn: FsHistoryProvider => Unit): Unit = {
+    var provider: FsHistoryProvider = null
+    try {
+      val clock = new ManualClock()
+      val conf = new SparkConf().set(HISTORY_LOG_DIR, logDir)
+      val provider = new FsHistoryProvider(conf, clock)
+      provider.checkForLogs()
+      fn(provider)
+    } finally {
+      if (provider != null) {
+        provider.stop()
+        provider = null
+      }
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala
new file mode 100644
index 0000000..160535e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryHistorySuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.Locale
+import javax.servlet.http.HttpServletRequest
+
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.deploy.history.{Utils => HsUtils}
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
+import org.apache.spark.sql.test.SharedSparkSession
+
+class StreamingQueryHistorySuite extends SharedSparkSession with BeforeAndAfter {
+
+  test("support streaming query events") {
+    val logDir = Thread.currentThread().getContextClassLoader.getResource("spark-events").toString
+    HsUtils.withFsHistoryProvider(logDir) { provider =>
+      val appUi = provider.getAppUI("local-1596020211915", None).getOrElse {
+        assert(false, "Failed to load event log of local-1596020211915.")
+        null
+      }
+      assert(appUi.ui.appName == "StructuredKafkaWordCount")
+      assert(appUi.ui.store.store.count(classOf[StreamingQueryData]) == 1)
+      assert(appUi.ui.store.store.count(classOf[StreamingQueryProgressWrapper]) == 8)
+
+      val store = new StreamingQueryStatusStore(appUi.ui.store.store)
+      val tab = new StreamingQueryTab(store, appUi.ui)
+      val request = mock(classOf[HttpServletRequest])
+      var html = new StreamingQueryPage(tab).render(request)
+        .toString().toLowerCase(Locale.ROOT)
+      // 81.39: Avg Input /sec
+      assert(html.contains("81.39"))
+      // 157.05: Avg Process /sec
+      assert(html.contains("157.05"))
+
+      val id = "8d268dc2-bc9c-4be8-97a9-b135d2943028"
+      val runId = "e225d92f-2545-48f8-87a2-9c0309580f8a"
+      when(request.getParameter("id")).thenReturn(runId)
+      html = new StreamingQueryStatisticsPage(tab).render(request)
+        .toString().toLowerCase(Locale.ROOT)
+      assert(html.contains("<strong>8</strong> completed batches"))
+      assert(html.contains(id))
+      assert(html.contains(runId))
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
index c2b6688..246fa1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.sql.streaming.ui
 import java.util.{Locale, UUID}
 import javax.servlet.http.HttpServletRequest
 
+import scala.xml.Node
+
 import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
 import org.scalatest.BeforeAndAfter
-import scala.xml.Node
 
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
 import org.apache.spark.sql.streaming.StreamingQueryProgress
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.ui.SparkUI
@@ -35,26 +37,26 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
     val id = UUID.randomUUID()
     val request = mock(classOf[HttpServletRequest])
     val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
-    val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+    val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS)
     when(tab.appName).thenReturn("testing")
     when(tab.headerTabs).thenReturn(Seq.empty)
-    when(tab.statusListener).thenReturn(statusListener)
+    when(tab.store).thenReturn(store)
 
     val streamQuery = createStreamQueryUIData(id)
-    when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+    when(store.allQueryUIData).thenReturn(Seq(streamQuery))
     var html = renderStreamingQueryPage(request, tab)
       .toString().toLowerCase(Locale.ROOT)
     assert(html.contains("active streaming queries (1)"))
 
-    when(streamQuery.isActive).thenReturn(false)
-    when(streamQuery.exception).thenReturn(None)
+    when(streamQuery.summary.isActive).thenReturn(false)
+    when(streamQuery.summary.exception).thenReturn(None)
     html = renderStreamingQueryPage(request, tab)
       .toString().toLowerCase(Locale.ROOT)
     assert(html.contains("completed streaming queries (1)"))
     assert(html.contains("finished"))
 
-    when(streamQuery.isActive).thenReturn(false)
-    when(streamQuery.exception).thenReturn(Option("exception in query"))
+    when(streamQuery.summary.isActive).thenReturn(false)
+    when(streamQuery.summary.exception).thenReturn(Option("exception in query"))
     html = renderStreamingQueryPage(request, tab)
       .toString().toLowerCase(Locale.ROOT)
     assert(html.contains("completed streaming queries (1)"))
@@ -66,17 +68,20 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
     val id = UUID.randomUUID()
     val request = mock(classOf[HttpServletRequest])
     val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
-    val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+    val store = mock(classOf[StreamingQueryStatusStore], RETURNS_SMART_NULLS)
+    when(request.getParameter("id")).thenReturn(id.toString)
+    when(tab.appName).thenReturn("testing")
+    when(tab.headerTabs).thenReturn(Seq.empty)
+    when(tab.store).thenReturn(store)
     val ui = mock(classOf[SparkUI])
     when(request.getParameter("id")).thenReturn(id.toString)
     when(tab.appName).thenReturn("testing")
     when(tab.headerTabs).thenReturn(Seq.empty)
-    when(tab.statusListener).thenReturn(statusListener)
     when(ui.conf).thenReturn(new SparkConf())
     when(tab.parent).thenReturn(ui)
 
     val streamQuery = createStreamQueryUIData(id)
-    when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+    when(store.allQueryUIData).thenReturn(Seq(streamQuery))
     val html = renderStreamingQueryStatisticsPage(request, tab)
       .toString().toLowerCase(Locale.ROOT)
 
@@ -94,15 +99,18 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
     when(progress.batchId).thenReturn(2)
     when(progress.prettyJson).thenReturn("""{"a":1}""")
 
+    val summary = mock(classOf[StreamingQueryData], RETURNS_SMART_NULLS)
+    when(summary.isActive).thenReturn(true)
+    when(summary.name).thenReturn("query")
+    when(summary.id).thenReturn(id)
+    when(summary.runId).thenReturn(id)
+    when(summary.startTimestamp).thenReturn(1L)
+    when(summary.exception).thenReturn(None)
+
     val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
-    when(streamQuery.isActive).thenReturn(true)
-    when(streamQuery.name).thenReturn("query")
-    when(streamQuery.id).thenReturn(id)
-    when(streamQuery.runId).thenReturn(id)
-    when(streamQuery.startTimestamp).thenReturn(1L)
+    when(streamQuery.summary).thenReturn(summary)
     when(streamQuery.lastProgress).thenReturn(progress)
     when(streamQuery.recentProgress).thenReturn(Array(progress))
-    when(streamQuery.exception).thenReturn(None)
 
     streamQuery
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 6aa440e..91c55d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -17,19 +17,28 @@
 
 package org.apache.spark.sql.streaming.ui
 
-import java.util.UUID
+import java.text.SimpleDateFormat
+import java.util.{Date, UUID}
 
 import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
+import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
 import org.apache.spark.sql.streaming
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.kvstore.InMemoryStore
 
 class StreamingQueryStatusListenerSuite extends StreamTest {
 
   test("onQueryStarted, onQueryProgress, onQueryTerminated") {
-    val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
+    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore)
+    val queryStore = new StreamingQueryStatusStore(kvStore)
 
-    // hanlde query started event
+    // handle query started event
     val id = UUID.randomUUID()
     val runId = UUID.randomUUID()
     val startEvent = new StreamingQueryListener.QueryStartedEvent(
@@ -37,8 +46,9 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     listener.onQueryStarted(startEvent)
 
     // result checking
-    assert(listener.activeQueryStatus.size() == 1)
-    assert(listener.activeQueryStatus.get(runId).name == "test")
+    assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
+    assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData =>
+      uiData.summary.runId == runId && uiData.summary.name.equals("test")))
 
     // handle query progress event
     val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
@@ -53,28 +63,32 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     listener.onQueryProgress(processEvent)
 
     // result checking
-    val activeQuery = listener.activeQueryStatus.get(runId)
-    assert(activeQuery.isActive)
-    assert(activeQuery.recentProgress.length == 1)
-    assert(activeQuery.lastProgress.id == id)
-    assert(activeQuery.lastProgress.runId == runId)
-    assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
-    assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0)
-    assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0)
-    assert(activeQuery.lastProgress.batchId == 2)
-    assert(activeQuery.lastProgress.prettyJson == """{"a":1}""")
+    val activeQuery =
+      queryStore.allQueryUIData.filter(_.summary.isActive).find(_.summary.runId == runId)
+    assert(activeQuery.isDefined)
+    assert(activeQuery.get.summary.isActive)
+    assert(activeQuery.get.recentProgress.length == 1)
+    assert(activeQuery.get.lastProgress.id == id)
+    assert(activeQuery.get.lastProgress.runId == runId)
+    assert(activeQuery.get.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
+    assert(activeQuery.get.lastProgress.inputRowsPerSecond == 10.0)
+    assert(activeQuery.get.lastProgress.processedRowsPerSecond == 12.0)
+    assert(activeQuery.get.lastProgress.batchId == 2)
+    assert(activeQuery.get.lastProgress.prettyJson == """{"a":1}""")
 
     // handle terminate event
     val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None)
     listener.onQueryTerminated(terminateEvent)
 
-    assert(!listener.inactiveQueryStatus.head.isActive)
-    assert(listener.inactiveQueryStatus.head.runId == runId)
-    assert(listener.inactiveQueryStatus.head.id == id)
+    assert(!queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.isActive)
+    assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId)
+    assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id)
   }
 
   test("same query start multiple times") {
-    val listener = new StreamingQueryStatusListener(spark.sparkContext.conf)
+    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val listener = new StreamingQueryStatusListener(spark.sparkContext.conf, kvStore)
+    val queryStore = new StreamingQueryStatusStore(kvStore)
 
     // handle first time start
     val id = UUID.randomUUID()
@@ -94,11 +108,106 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     listener.onQueryStarted(startEvent1)
 
     // result checking
-    assert(listener.activeQueryStatus.size() == 1)
-    assert(listener.inactiveQueryStatus.length == 1)
-    assert(listener.activeQueryStatus.containsKey(runId1))
-    assert(listener.activeQueryStatus.get(runId1).id == id)
-    assert(listener.inactiveQueryStatus.head.runId == runId0)
-    assert(listener.inactiveQueryStatus.head.id == id)
+    assert(queryStore.allQueryUIData.count(_.summary.isActive) == 1)
+    assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).length == 1)
+    assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(_.summary.runId == runId1))
+    assert(queryStore.allQueryUIData.filter(_.summary.isActive).exists(uiData =>
+      uiData.summary.runId == runId1 && uiData.summary.id == id))
+    assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.runId == runId0)
+    assert(queryStore.allQueryUIData.filterNot(_.summary.isActive).head.summary.id == id)
+  }
+
+  test("test small retained queries") {
+    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val conf = spark.sparkContext.conf
+    conf.set(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES.key, "2")
+    val listener = new StreamingQueryStatusListener(conf, kvStore)
+    val queryStore = new StreamingQueryStatusStore(kvStore)
+
+    def addNewQuery(): (UUID, UUID) = {
+      val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+      format.setTimeZone(getTimeZone("UTC"))
+      val id = UUID.randomUUID()
+      val runId = UUID.randomUUID()
+      val startEvent = new StreamingQueryListener.QueryStartedEvent(
+        id, runId, "test1", format.format(new Date(System.currentTimeMillis())))
+      listener.onQueryStarted(startEvent)
+      (id, runId)
+    }
+
+    def checkInactiveQueryStatus(numInactives: Int, targetInactives: Seq[UUID]): Unit = {
+      eventually(timeout(10.seconds)) {
+        val inactiveQueries = queryStore.allQueryUIData.filter(!_.summary.isActive)
+        assert(inactiveQueries.size == numInactives)
+        assert(inactiveQueries.map(_.summary.id).toSet == targetInactives.toSet)
+      }
+    }
+
+    val (id1, runId1) = addNewQuery()
+    val (id2, runId2) = addNewQuery()
+    val (id3, runId3) = addNewQuery()
+    assert(queryStore.allQueryUIData.count(!_.summary.isActive) == 0)
+
+    val terminateEvent1 = new StreamingQueryListener.QueryTerminatedEvent(id1, runId1, None)
+    listener.onQueryTerminated(terminateEvent1)
+    checkInactiveQueryStatus(1, Seq(id1))
+    val terminateEvent2 = new StreamingQueryListener.QueryTerminatedEvent(id2, runId2, None)
+    listener.onQueryTerminated(terminateEvent2)
+    checkInactiveQueryStatus(2, Seq(id1, id2))
+    val terminateEvent3 = new StreamingQueryListener.QueryTerminatedEvent(id3, runId3, None)
+    listener.onQueryTerminated(terminateEvent3)
+    checkInactiveQueryStatus(2, Seq(id2, id3))
+  }
+
+  test("test small retained progress") {
+    val kvStore = new ElementTrackingStore(new InMemoryStore(), sparkConf)
+    val conf = spark.sparkContext.conf
+    conf.set(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES.key, "5")
+    val listener = new StreamingQueryStatusListener(conf, kvStore)
+    val queryStore = new StreamingQueryStatusStore(kvStore)
+
+    val id = UUID.randomUUID()
+    val runId = UUID.randomUUID()
+    val startEvent = new StreamingQueryListener.QueryStartedEvent(
+      id, runId, "test", "2016-12-05T20:54:20.827Z")
+    listener.onQueryStarted(startEvent)
+
+    var batchId: Int = 0
+
+    def addQueryProgress(): Unit = {
+      val progress = mockProgressData(id, runId)
+      val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress)
+      listener.onQueryProgress(processEvent)
+    }
+
+    def mockProgressData(id: UUID, runId: UUID): StreamingQueryProgress = {
+      val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+      format.setTimeZone(getTimeZone("UTC"))
+
+      val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+      when(progress.id).thenReturn(id)
+      when(progress.runId).thenReturn(runId)
+      when(progress.timestamp).thenReturn(format.format(new Date(System.currentTimeMillis())))
+      when(progress.inputRowsPerSecond).thenReturn(10.0)
+      when(progress.processedRowsPerSecond).thenReturn(12.0)
+      when(progress.batchId).thenReturn(batchId)
+      when(progress.prettyJson).thenReturn("""{"a":1}""")
+
+      batchId += 1
+      progress
+    }
+
+    def checkQueryProcessData(targetNum: Int): Unit = {
+      eventually(timeout(10.seconds)) {
+        assert(queryStore.getQueryProgressData(runId).size == targetNum)
+      }
+    }
+
+    Array.tabulate(4) { _ => addQueryProgress() }
+    checkQueryProcessData(4)
+    addQueryProgress()
+    checkQueryProcessData(5)
+    addQueryProgress()
+    checkQueryProcessData(5)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org