You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/10/22 06:52:45 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d6cefb1  [KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page
d6cefb1 is described below

commit d6cefb1b9c67fb266be426717120c8151b13b385
Author: timothy65535 <ti...@163.com>
AuthorDate: Fri Oct 22 14:52:35 2021 +0800

    [KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Add Statement stats on Kyuubi Query Engine Page
    
    ![image](https://user-images.githubusercontent.com/86483005/137263757-398a378f-e0b8-498f-b482-60eef650c0ff.png)
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1234 from timothy65535/ky-1188.
    
    Closes #1188
    
    6e0ac523 [timothy65535] trigger rebuild
    2b2b2a03 [timothy65535] update patch
    f0431359 [timothy65535] [KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page
    
    Authored-by: timothy65535 <ti...@163.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 docs/deployment/settings.md                        |   1 +
 .../engine/spark/events/EngineEventsStore.scala    |  56 +++++-
 .../engine/spark/events/SparkStatementEvent.scala  |  10 ++
 .../engine/spark/operation/ExecuteStatement.scala  |   5 +-
 .../spark/kyuubi/SparkSQLEngineListener.scala      |  11 +-
 .../org/apache/spark/kyuubi/ui/EnginePage.scala    | 189 ++++++++++++++++++++-
 .../spark/events/EngineEventsStoreSuite.scala      |  52 +++++-
 .../apache/spark/kyuubi/ui/EngineTabSuite.scala    |  36 ++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      |   8 +
 9 files changed, 356 insertions(+), 12 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index e539b66..402660b 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -189,6 +189,7 @@ kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap:
 kyuubi\.engine\.share<br>\.level\.subdomain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper sub path. For example, for `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End- [...]
 kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
 kyuubi\.engine\.ui<br>\.retainedSessions|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of SQL client sessions kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.engine\.ui<br>\.retainedStatements|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of statements kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
 kyuubi\.engine\.ui\.stop<br>\.enabled|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, allows Kyuubi engine to be killed from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
 
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
index dd52635..64030ec 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters.collectionAsScalaIterableConverter
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_STATEMENT_LIMIT}
 
 /**
  * A memory store that tracking the number of statements and sessions, it provides:
@@ -32,7 +32,6 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
  * 1). remove the finished events first.
  * 2). remove the active events if still reach the threshold.
  *
- * // TODO KYUUBI #983 this store will be used in the third task.
  */
 class EngineEventsStore(conf: KyuubiConf) {
 
@@ -42,6 +41,11 @@ class EngineEventsStore(conf: KyuubiConf) {
   private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)
 
   /**
+   * The number of statements kept in the Kyuubi Query Engine web UI.
+   */
+  private val retainedStatements: Int = conf.get(ENGINE_UI_STATEMENT_LIMIT)
+
+  /**
    * store all session events.
    */
   val sessions = new ConcurrentHashMap[String, SessionEvent]
@@ -74,7 +78,7 @@ class EngineEventsStore(conf: KyuubiConf) {
     val reverseSeq = sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
 
     // remove finished sessions first.
-    for (event <- reverseSeq if event.endTime != 0L && countToDelete > 0) {
+    for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
       sessions.remove(event.sessionId)
       countToDelete -= 1
     }
@@ -85,5 +89,51 @@ class EngineEventsStore(conf: KyuubiConf) {
       countToDelete -= 1
     }
   }
+
+  /**
+   * store all statements events.
+   */
+  val statements = new ConcurrentHashMap[String, SparkStatementEvent]
+
+  /**
+   * get all statement events order by startTime
+   */
+  def getStatementList: Seq[SparkStatementEvent] = {
+    statements.values().asScala.toSeq.sortBy(_.createTime)
+  }
+
+  def getStatement(statementId: String): Option[SparkStatementEvent] = {
+    Option(statements.get(statementId))
+  }
+
+  /**
+   * save statement events and check the capacity threshold
+   */
+  def saveStatement(statementEvent: SparkStatementEvent): Unit = {
+    statements.put(statementEvent.statementId, statementEvent)
+    checkStatementCapacity()
+  }
+
+  /**
+   * cleanup the statement events if reach the threshold
+   */
+  private def checkStatementCapacity(): Unit = {
+    var countToDelete = statements.size - retainedStatements
+
+    val reverseSeq = statements.values().asScala.toSeq.sortBy(_.createTime).reverse
+
+    //  remove finished statements first.
+    for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
+      statements.remove(event.statementId)
+      countToDelete -= 1
+    }
+
+    // remove active event if still reach the threshold
+    for (event <- reverseSeq if countToDelete > 0) {
+      statements.remove(event.statementId)
+      countToDelete -= 1
+    }
+  }
+
 }
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
index c650956..e055eff 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
@@ -34,6 +34,7 @@ import org.apache.kyuubi.Utils
  * @param exception: caught exception if have
  */
 case class SparkStatementEvent(
+    username: String,
     statementId: String,
     statement: String,
     appId: String,
@@ -41,10 +42,19 @@ case class SparkStatementEvent(
     createTime: Long,
     var state: String,
     var stateTime: Long,
+    var endTime: Long = -1L,
     var queryExecution: String = "",
     var exception: String = "") extends KyuubiSparkEvent {
 
   override def schema: StructType = Encoders.product[SparkStatementEvent].schema
   override def partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
+
+  def duration: Long = {
+    if (endTime == -1L) {
+      System.currentTimeMillis - createTime
+    } else {
+      endTime - createTime
+    }
+  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 2e2eb31..800f0e8 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -62,7 +62,7 @@ class ExecuteStatement(
   private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
 
   val statementEvent: SparkStatementEvent = SparkStatementEvent(
-    statementId, statement, spark.sparkContext.applicationId,
+    session.user, statementId, statement, spark.sparkContext.applicationId,
     session.handle.identifier.toString, lastAccessTime, state.toString, lastAccessTime)
   EventLoggingService.onEvent(statementEvent)
 
@@ -178,6 +178,9 @@ class ExecuteStatement(
     super.setState(newState)
     statementEvent.state = newState.toString
     statementEvent.stateTime = lastAccessTime
+    if (newState == OperationState.ERROR || newState == OperationState.FINISHED) {
+      statementEvent.endTime = System.currentTimeMillis()
+    }
     EventLoggingService.onEvent(statementEvent)
   }
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
index ef767aa..79e0246 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.Utils.stringifyException
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent}
+import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkStatementEvent}
 import org.apache.kyuubi.service.{Serverable, ServiceState}
 
 /**
@@ -117,12 +117,17 @@ class SparkSQLEngineListener(
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     event match {
-      case e: SessionEvent => updateSession(e)
+      case e: SessionEvent => updateSessionStore(e)
+      case e: SparkStatementEvent => updateStatementStore(e)
       case _ => // Ignore
     }
   }
 
-  private def updateSession(event: SessionEvent): Unit = {
+  private def updateSessionStore(event: SessionEvent): Unit = {
     store.saveSession(event)
   }
+
+  private def updateStatementStore(event: SparkStatementEvent): Unit = {
+    store.saveStatement(event)
+  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
index 1c80dd7..07dd99e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
@@ -25,11 +25,13 @@ import javax.servlet.http.HttpServletRequest
 import scala.collection.JavaConverters.mapAsScalaMapConverter
 import scala.xml.{Node, Unparsed}
 
+import org.apache.commons.text.StringEscapeUtils
 import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
 import org.apache.spark.ui.UIUtils._
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.engine.spark.events.SessionEvent
+import org.apache.kyuubi.engine.spark.events.SparkStatementEvent
 
 case class EnginePage(parent: EngineTab) extends WebUIPage("") {
   private val store = parent.store
@@ -45,7 +47,8 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
         running {parent.engine.backendService.sessionManager.operationManager.getOperationCount}
         operations
       </h4> ++
-      generateSessionStatsTable(request)
+      generateSessionStatsTable(request) ++
+      generateStatementStatsTable(request)
     UIUtils.headerSparkPage(request, parent.name, content, parent)
   }
 
@@ -93,6 +96,154 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
     }
   }
 
+  /** Generate stats of statements for the engine */
+  private def generateStatementStatsTable(request: HttpServletRequest): Seq[Node] = {
+
+    val numStatement = store.getStatementList.size
+
+    val table = if (numStatement > 0) {
+
+      val sqlTableTag = "sqlstat"
+
+      val sqlTablePage =
+        Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
+
+      try {
+        Some(new StatementStatsPagedTable(
+          request,
+          parent,
+          store.getStatementList,
+          "kyuubi",
+          UIUtils.prependBaseUri(request, parent.basePath),
+          sqlTableTag).table(sqlTablePage))
+      } catch {
+        case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+          Some(<div class="alert alert-error">
+            <p>Error while rendering job table:</p>
+            <pre>
+              {Utils.stringifyException(e)}
+            </pre>
+          </div>)
+      }
+    } else {
+      None
+    }
+    val content =
+      <span id="sqlstat" class="collapse-aggregated-sqlstat collapse-table"
+            onClick="collapseTable('collapse-aggregated-sqlstat',
+                'aggregated-sqlstat')">
+        <h4>
+          <span class="collapse-table-arrow arrow-open"></span>
+          <a>SQL Statistics ({numStatement})</a>
+        </h4>
+      </span> ++
+        <div class="aggregated-sqlstat collapsible-table">
+          {table.getOrElse("No statistics have been generated yet.")}
+        </div>
+    content
+  }
+
+  private class StatementStatsPagedTable(
+      request: HttpServletRequest,
+      parent: EngineTab,
+      data: Seq[SparkStatementEvent],
+      subPath: String,
+      basePath: String,
+      sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] {
+
+    private val (sortColumn, desc, pageSize) =
+      getRequestTableParameters(request, sqlStatsTableTag, "Create Time")
+
+    private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+    private val parameterPath =
+      s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sqlStatsTableTag)}"
+
+    override val dataSource = new StatementStatsTableDataSource(data, pageSize, sortColumn, desc)
+
+    override def tableId: String = sqlStatsTableTag
+
+    override def tableCssClass: String =
+      "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
+
+    override def pageLink(page: Int): String = {
+      parameterPath +
+        s"&$pageNumberFormField=$page" +
+        s"&$sqlStatsTableTag.sort=$encodedSortColumn" +
+        s"&$sqlStatsTableTag.desc=$desc" +
+        s"&$pageSizeFormField=$pageSize" +
+        s"#$sqlStatsTableTag"
+    }
+
+    override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize"
+
+    override def pageNumberFormField: String = s"$sqlStatsTableTag.page"
+
+    override def goButtonFormPath: String =
+      s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" +
+        s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag"
+
+    override def headers: Seq[Node] = {
+      val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
+        Seq(
+          ("User", true, None),
+          ("Statement ID", true, None),
+          ("Create Time", true, None),
+          ("Finish Time", true, None),
+          ("Duration", true, None),
+          ("Statement", true, None),
+          ("State", true, None),
+          ("Query Execution", true, None))
+
+      headerStatRow(sqlTableHeadersAndTooltips, desc, pageSize, sortColumn, parameterPath,
+        sqlStatsTableTag, sqlStatsTableTag)
+    }
+
+    override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = {
+      <tr>
+        <td>
+          {sparkStatementEvent.username}
+        </td>
+        <td>
+          {sparkStatementEvent.statementId}
+        </td>
+        <td >
+          {formatDate(sparkStatementEvent.createTime)}
+        </td>
+        <td>
+          {if (sparkStatementEvent.endTime > 0) formatDate(sparkStatementEvent.endTime)}
+        </td>
+        <td >
+          {formatDurationVerbose(sparkStatementEvent.duration)}
+        </td>
+        <td>
+          <span class="description-input">
+            {sparkStatementEvent.statement}
+          </span>
+        </td>
+        <td>
+          {sparkStatementEvent.state}
+        </td>
+        {errorMessageCell(sparkStatementEvent.queryExecution)}
+      </tr>
+    }
+
+    private def errorMessageCell(errorMessage: String): Seq[Node] = {
+      val isMultiline = errorMessage.indexOf('\n') >= 0
+      val errorSummary = StringEscapeUtils.escapeHtml4(
+        if (isMultiline) {
+          errorMessage.substring(0, errorMessage.indexOf('\n'))
+        } else {
+          errorMessage
+        })
+      val details = detailsUINode(isMultiline, errorMessage)
+      <td>
+        {errorSummary}{details}
+      </td>
+    }
+
+  }
+
   /** Generate stats of sessions for the engine */
   private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
     val numSessions = store.getSessionList.size
@@ -333,3 +484,39 @@ private class SessionStatsTableDataSource(
     }
   }
 }
+
+private class StatementStatsTableDataSource(
+    info: Seq[SparkStatementEvent],
+    pageSize: Int,
+    sortColumn: String,
+    desc: Boolean) extends PagedDataSource[SparkStatementEvent](pageSize) {
+
+  // Sorting SessionEvent data
+  private val data = info.sorted(ordering(sortColumn, desc))
+
+  override def dataSize: Int = data.size
+
+  override def sliceData(from: Int, to: Int): Seq[SparkStatementEvent] = data.slice(from, to)
+
+  /**
+   * Return Ordering according to sortColumn and desc.
+   */
+  private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkStatementEvent] = {
+    val ordering: Ordering[SparkStatementEvent] = sortColumn match {
+      case "User" => Ordering.by(_.username)
+      case "Statement ID" => Ordering.by(_.statementId)
+      case "Create Time" => Ordering by (_.createTime)
+      case "Finish Time" => Ordering.by(_.endTime)
+      case "Duration" => Ordering.by(_.duration)
+      case "Statement" => Ordering.by(_.statement)
+      case "State" => Ordering.by(_.state)
+      case "Query Execution" => Ordering.by(_.queryExecution)
+      case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+    }
+    if (desc) {
+      ordering.reverse
+    } else {
+      ordering
+    }
+  }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
index 46da9c7..9166cd8 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.events
 
 import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_STATEMENT_LIMIT}
 
 class EngineEventsStoreSuite extends KyuubiFunSuite {
 
@@ -58,10 +58,10 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
 
     val store = new EngineEventsStore(conf)
 
-    store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, 0L))
-    store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, 0L))
+    store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, -1L))
+    store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, -1L))
     store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L))
-    store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, 0L))
+    store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, -1L))
 
     assert(store.getSessionList.size == 3)
     assert(store.getSessionList(2).sessionId == "s4")
@@ -79,4 +79,48 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
     assert(store.getSession("abc").get.endTime == finishTimestamp)
   }
 
+  test("ensure that the statements are stored in order") {
+    val store = new EngineEventsStore(KyuubiConf())
+
+    val s1 = SparkStatementEvent("a", "ea1", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
+    val s2 = SparkStatementEvent("c", "ea2", "select 2", "app2", "sid1", 2L, "RUNNING", 2L)
+    val s3 = SparkStatementEvent("b", "ea3", "select 3", "app3", "sid1", 3L, "RUNNING", 2L)
+
+    store.saveStatement(s1)
+    store.saveStatement(s2)
+    store.saveStatement(s3)
+
+    assert(store.getStatementList.size == 3)
+    assert(store.getStatementList.head.statementId == "ea1")
+    assert(store.getStatementList.last.statementId == "ea3")
+  }
+
+  test("test drop statements when reach the threshold ") {
+    val conf = KyuubiConf()
+    conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
+
+    val store = new EngineEventsStore(conf)
+    for (i <- 1 to 5) {
+      val s = SparkStatementEvent("a", s"ea1${i}", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
+      store.saveStatement(s)
+    }
+
+    assert(store.getStatementList.size == 3)
+  }
+
+  test("test drop statements when reach the threshold, and try to keep active events.") {
+    val conf = KyuubiConf()
+    conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
+
+    val store = new EngineEventsStore(conf)
+
+    store.saveStatement(SparkStatementEvent("a", "s1", "select 1", "a1", "si1", 1L, "RUNNING", -1L))
+    store.saveStatement(SparkStatementEvent("a", "s2", "select 1", "a2", "si1", 2L, "RUNNING", -1L))
+    store.saveStatement(SparkStatementEvent("a", "s3", "1", "a3", "si1", 3L, "ERROR", 3L, 3L))
+    store.saveStatement(SparkStatementEvent("a", "s4", "select 1", "a4", "si1", 4L, "RUNNING", -1L))
+
+    assert(store.getStatementList.size == 3)
+    assert(store.getStatementList(2).statementId == "s4")
+  }
+
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
index 9f759a7..717c9fa 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
@@ -100,5 +100,41 @@ class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils {
     }
   }
 
+  test("statement stats for engine tab") {
+    assert(spark.sparkContext.ui.nonEmpty)
+    val client = HttpClients.createDefault()
+    val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
+    val response = client.execute(req)
+    assert(response.getStatusLine.getStatusCode === 200)
+    val resp = EntityUtils.toString(response.getEntity)
+    assert(resp.contains("0 session(s) are online,"))
+    withJdbcStatement() { statement =>
+      statement.execute(
+        """
+          |SELECT
+          |  l.id % 100 k,
+          |  sum(l.id) sum,
+          |  count(l.id) cnt,
+          |  avg(l.id) avg,
+          |  min(l.id) min,
+          |  max(l.id) max
+          |from range(0, 100000L, 1, 100) l
+          |  left join range(0, 100000L, 2, 100) r ON l.id = r.id
+          |GROUP BY 1""".stripMargin)
+      val response = client.execute(req)
+      assert(response.getStatusLine.getStatusCode === 200)
+      val resp = EntityUtils.toString(response.getEntity)
+
+      // check session section
+      assert(resp.contains("SQL Statistics"))
+
+      // check sql stats table id
+      assert(resp.contains("sqlstat"))
+
+      // check sql stats table title
+      assert(resp.contains("Query Execution"))
+    }
+  }
+
   override protected def jdbcUrl: String = getJdbcUrl
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 0628549..e035550 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -902,6 +902,14 @@ object KyuubiConf {
       .checkValue(_ > 0, "retained sessions must be positive.")
       .createWithDefault(200)
 
+  val ENGINE_UI_STATEMENT_LIMIT: ConfigEntry[Int] =
+    buildConf("engine.ui.retainedStatements")
+      .doc("The number of statements kept in the Kyuubi Query Engine web UI.")
+      .version("1.4.0")
+      .intConf
+      .checkValue(_ > 0, "retained statements must be positive.")
+      .createWithDefault(200)
+
   val ENGINE_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
     buildConf("engine.operation.log.dir.root")
       .doc("Root directory for query operation log at engine-side.")