You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2021/10/22 06:52:45 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1188] Add
Statement stats on Kyuubi Query Engine Page
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d6cefb1 [KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page
d6cefb1 is described below
commit d6cefb1b9c67fb266be426717120c8151b13b385
Author: timothy65535 <ti...@163.com>
AuthorDate: Fri Oct 22 14:52:35 2021 +0800
[KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Add Statement stats on Kyuubi Query Engine Page
![image](https://user-images.githubusercontent.com/86483005/137263757-398a378f-e0b8-498f-b482-60eef650c0ff.png)
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1234 from timothy65535/ky-1188.
Closes #1188
6e0ac523 [timothy65535] trigger rebuild
2b2b2a03 [timothy65535] update patch
f0431359 [timothy65535] [KYUUBI #1188] Add Statement stats on Kyuubi Query Engine Page
Authored-by: timothy65535 <ti...@163.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
docs/deployment/settings.md | 1 +
.../engine/spark/events/EngineEventsStore.scala | 56 +++++-
.../engine/spark/events/SparkStatementEvent.scala | 10 ++
.../engine/spark/operation/ExecuteStatement.scala | 5 +-
.../spark/kyuubi/SparkSQLEngineListener.scala | 11 +-
.../org/apache/spark/kyuubi/ui/EnginePage.scala | 189 ++++++++++++++++++++-
.../spark/events/EngineEventsStoreSuite.scala | 52 +++++-
.../apache/spark/kyuubi/ui/EngineTabSuite.scala | 36 ++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 +
9 files changed, 356 insertions(+), 12 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index e539b66..402660b 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -189,6 +189,7 @@ kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap:
kyuubi\.engine\.share<br>\.level\.subdomain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper sub path. For example, for `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End- [...]
kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.ui<br>\.retainedSessions|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of SQL client sessions kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.engine\.ui<br>\.retainedStatements|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The number of statements kept in the Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.ui\.stop<br>\.enabled|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, allows Kyuubi engine to be killed from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
index dd52635..64030ec 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_STATEMENT_LIMIT}
/**
* A memory store that tracking the number of statements and sessions, it provides:
@@ -32,7 +32,6 @@ import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
* 1). remove the finished events first.
* 2). remove the active events if still reach the threshold.
*
- * // TODO KYUUBI #983 this store will be used in the third task.
*/
class EngineEventsStore(conf: KyuubiConf) {
@@ -42,6 +41,11 @@ class EngineEventsStore(conf: KyuubiConf) {
private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)
/**
+ * The number of statements kept in the Kyuubi Query Engine web UI.
+ */
+ private val retainedStatements: Int = conf.get(ENGINE_UI_STATEMENT_LIMIT)
+
+ /**
* store all session events.
*/
val sessions = new ConcurrentHashMap[String, SessionEvent]
@@ -74,7 +78,7 @@ class EngineEventsStore(conf: KyuubiConf) {
val reverseSeq = sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
// remove finished sessions first.
- for (event <- reverseSeq if event.endTime != 0L && countToDelete > 0) {
+ for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
sessions.remove(event.sessionId)
countToDelete -= 1
}
@@ -85,5 +89,51 @@ class EngineEventsStore(conf: KyuubiConf) {
countToDelete -= 1
}
}
+
+ /**
+ * store all statements events.
+ */
+ val statements = new ConcurrentHashMap[String, SparkStatementEvent]
+
+ /**
+ * get all statement events order by startTime
+ */
+ def getStatementList: Seq[SparkStatementEvent] = {
+ statements.values().asScala.toSeq.sortBy(_.createTime)
+ }
+
+ def getStatement(statementId: String): Option[SparkStatementEvent] = {
+ Option(statements.get(statementId))
+ }
+
+ /**
+ * save statement events and check the capacity threshold
+ */
+ def saveStatement(statementEvent: SparkStatementEvent): Unit = {
+ statements.put(statementEvent.statementId, statementEvent)
+ checkStatementCapacity()
+ }
+
+ /**
+ * cleanup the statement events if reach the threshold
+ */
+ private def checkStatementCapacity(): Unit = {
+ var countToDelete = statements.size - retainedStatements
+
+ val reverseSeq = statements.values().asScala.toSeq.sortBy(_.createTime).reverse
+
+ // remove finished statements first.
+ for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
+ statements.remove(event.statementId)
+ countToDelete -= 1
+ }
+
+ // remove active event if still reach the threshold
+ for (event <- reverseSeq if countToDelete > 0) {
+ statements.remove(event.statementId)
+ countToDelete -= 1
+ }
+ }
+
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
index c650956..e055eff 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkStatementEvent.scala
@@ -34,6 +34,7 @@ import org.apache.kyuubi.Utils
* @param exception: caught exception if have
*/
case class SparkStatementEvent(
+ username: String,
statementId: String,
statement: String,
appId: String,
@@ -41,10 +42,19 @@ case class SparkStatementEvent(
createTime: Long,
var state: String,
var stateTime: Long,
+ var endTime: Long = -1L,
var queryExecution: String = "",
var exception: String = "") extends KyuubiSparkEvent {
override def schema: StructType = Encoders.product[SparkStatementEvent].schema
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
+
+ def duration: Long = {
+ if (endTime == -1L) {
+ System.currentTimeMillis - createTime
+ } else {
+ endTime - createTime
+ }
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 2e2eb31..800f0e8 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -62,7 +62,7 @@ class ExecuteStatement(
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
val statementEvent: SparkStatementEvent = SparkStatementEvent(
- statementId, statement, spark.sparkContext.applicationId,
+ session.user, statementId, statement, spark.sparkContext.applicationId,
session.handle.identifier.toString, lastAccessTime, state.toString, lastAccessTime)
EventLoggingService.onEvent(statementEvent)
@@ -178,6 +178,9 @@ class ExecuteStatement(
super.setState(newState)
statementEvent.state = newState.toString
statementEvent.stateTime = lastAccessTime
+ if (newState == OperationState.ERROR || newState == OperationState.FINISHED) {
+ statementEvent.endTime = System.currentTimeMillis()
+ }
EventLoggingService.onEvent(statementEvent)
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
index ef767aa..79e0246 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent}
+import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkStatementEvent}
import org.apache.kyuubi.service.{Serverable, ServiceState}
/**
@@ -117,12 +117,17 @@ class SparkSQLEngineListener(
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
- case e: SessionEvent => updateSession(e)
+ case e: SessionEvent => updateSessionStore(e)
+ case e: SparkStatementEvent => updateStatementStore(e)
case _ => // Ignore
}
}
- private def updateSession(event: SessionEvent): Unit = {
+ private def updateSessionStore(event: SessionEvent): Unit = {
store.saveSession(event)
}
+
+ private def updateStatementStore(event: SparkStatementEvent): Unit = {
+ store.saveStatement(event)
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
index 1c80dd7..07dd99e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
@@ -25,11 +25,13 @@ import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.xml.{Node, Unparsed}
+import org.apache.commons.text.StringEscapeUtils
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.ui.UIUtils._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.events.SessionEvent
+import org.apache.kyuubi.engine.spark.events.SparkStatementEvent
case class EnginePage(parent: EngineTab) extends WebUIPage("") {
private val store = parent.store
@@ -45,7 +47,8 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
running {parent.engine.backendService.sessionManager.operationManager.getOperationCount}
operations
</h4> ++
- generateSessionStatsTable(request)
+ generateSessionStatsTable(request) ++
+ generateStatementStatsTable(request)
UIUtils.headerSparkPage(request, parent.name, content, parent)
}
@@ -93,6 +96,154 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
}
}
+ /** Generate stats of statements for the engine */
+ private def generateStatementStatsTable(request: HttpServletRequest): Seq[Node] = {
+
+ val numStatement = store.getStatementList.size
+
+ val table = if (numStatement > 0) {
+
+ val sqlTableTag = "sqlstat"
+
+ val sqlTablePage =
+ Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
+
+ try {
+ Some(new StatementStatsPagedTable(
+ request,
+ parent,
+ store.getStatementList,
+ "kyuubi",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ sqlTableTag).table(sqlTablePage))
+ } catch {
+ case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+ Some(<div class="alert alert-error">
+ <p>Error while rendering job table:</p>
+ <pre>
+ {Utils.stringifyException(e)}
+ </pre>
+ </div>)
+ }
+ } else {
+ None
+ }
+ val content =
+ <span id="sqlstat" class="collapse-aggregated-sqlstat collapse-table"
+ onClick="collapseTable('collapse-aggregated-sqlstat',
+ 'aggregated-sqlstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>SQL Statistics ({numStatement})</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-sqlstat collapsible-table">
+ {table.getOrElse("No statistics have been generated yet.")}
+ </div>
+ content
+ }
+
+ private class StatementStatsPagedTable(
+ request: HttpServletRequest,
+ parent: EngineTab,
+ data: Seq[SparkStatementEvent],
+ subPath: String,
+ basePath: String,
+ sqlStatsTableTag: String) extends PagedTable[SparkStatementEvent] {
+
+ private val (sortColumn, desc, pageSize) =
+ getRequestTableParameters(request, sqlStatsTableTag, "Create Time")
+
+ private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+ private val parameterPath =
+ s"$basePath/$subPath/?${getRequestParameterOtherTable(request, sqlStatsTableTag)}"
+
+ override val dataSource = new StatementStatsTableDataSource(data, pageSize, sortColumn, desc)
+
+ override def tableId: String = sqlStatsTableTag
+
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$sqlStatsTableTag.sort=$encodedSortColumn" +
+ s"&$sqlStatsTableTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$sqlStatsTableTag"
+ }
+
+ override def pageSizeFormField: String = s"$sqlStatsTableTag.pageSize"
+
+ override def pageNumberFormField: String = s"$sqlStatsTableTag.page"
+
+ override def goButtonFormPath: String =
+ s"$parameterPath&$sqlStatsTableTag.sort=$encodedSortColumn" +
+ s"&$sqlStatsTableTag.desc=$desc#$sqlStatsTableTag"
+
+ override def headers: Seq[Node] = {
+ val sqlTableHeadersAndTooltips: Seq[(String, Boolean, Option[String])] =
+ Seq(
+ ("User", true, None),
+ ("Statement ID", true, None),
+ ("Create Time", true, None),
+ ("Finish Time", true, None),
+ ("Duration", true, None),
+ ("Statement", true, None),
+ ("State", true, None),
+ ("Query Execution", true, None))
+
+ headerStatRow(sqlTableHeadersAndTooltips, desc, pageSize, sortColumn, parameterPath,
+ sqlStatsTableTag, sqlStatsTableTag)
+ }
+
+ override def row(sparkStatementEvent: SparkStatementEvent): Seq[Node] = {
+ <tr>
+ <td>
+ {sparkStatementEvent.username}
+ </td>
+ <td>
+ {sparkStatementEvent.statementId}
+ </td>
+ <td >
+ {formatDate(sparkStatementEvent.createTime)}
+ </td>
+ <td>
+ {if (sparkStatementEvent.endTime > 0) formatDate(sparkStatementEvent.endTime)}
+ </td>
+ <td >
+ {formatDurationVerbose(sparkStatementEvent.duration)}
+ </td>
+ <td>
+ <span class="description-input">
+ {sparkStatementEvent.statement}
+ </span>
+ </td>
+ <td>
+ {sparkStatementEvent.state}
+ </td>
+ {errorMessageCell(sparkStatementEvent.queryExecution)}
+ </tr>
+ }
+
+ private def errorMessageCell(errorMessage: String): Seq[Node] = {
+ val isMultiline = errorMessage.indexOf('\n') >= 0
+ val errorSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ errorMessage.substring(0, errorMessage.indexOf('\n'))
+ } else {
+ errorMessage
+ })
+ val details = detailsUINode(isMultiline, errorMessage)
+ <td>
+ {errorSummary}{details}
+ </td>
+ }
+
+ }
+
/** Generate stats of sessions for the engine */
private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
val numSessions = store.getSessionList.size
@@ -333,3 +484,39 @@ private class SessionStatsTableDataSource(
}
}
}
+
+private class StatementStatsTableDataSource(
+ info: Seq[SparkStatementEvent],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[SparkStatementEvent](pageSize) {
+
+ // Sorting SessionEvent data
+ private val data = info.sorted(ordering(sortColumn, desc))
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[SparkStatementEvent] = data.slice(from, to)
+
+ /**
+ * Return Ordering according to sortColumn and desc.
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[SparkStatementEvent] = {
+ val ordering: Ordering[SparkStatementEvent] = sortColumn match {
+ case "User" => Ordering.by(_.username)
+ case "Statement ID" => Ordering.by(_.statementId)
+ case "Create Time" => Ordering by (_.createTime)
+ case "Finish Time" => Ordering.by(_.endTime)
+ case "Duration" => Ordering.by(_.duration)
+ case "Statement" => Ordering.by(_.statement)
+ case "State" => Ordering.by(_.state)
+ case "Query Execution" => Ordering.by(_.queryExecution)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
index 46da9c7..9166cd8 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.events
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_STATEMENT_LIMIT}
class EngineEventsStoreSuite extends KyuubiFunSuite {
@@ -58,10 +58,10 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
val store = new EngineEventsStore(conf)
- store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, 0L))
- store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, 0L))
+ store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, -1L))
+ store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, -1L))
store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L))
- store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, 0L))
+ store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, -1L))
assert(store.getSessionList.size == 3)
assert(store.getSessionList(2).sessionId == "s4")
@@ -79,4 +79,48 @@ class EngineEventsStoreSuite extends KyuubiFunSuite {
assert(store.getSession("abc").get.endTime == finishTimestamp)
}
+ test("ensure that the statements are stored in order") {
+ val store = new EngineEventsStore(KyuubiConf())
+
+ val s1 = SparkStatementEvent("a", "ea1", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
+ val s2 = SparkStatementEvent("c", "ea2", "select 2", "app2", "sid1", 2L, "RUNNING", 2L)
+ val s3 = SparkStatementEvent("b", "ea3", "select 3", "app3", "sid1", 3L, "RUNNING", 2L)
+
+ store.saveStatement(s1)
+ store.saveStatement(s2)
+ store.saveStatement(s3)
+
+ assert(store.getStatementList.size == 3)
+ assert(store.getStatementList.head.statementId == "ea1")
+ assert(store.getStatementList.last.statementId == "ea3")
+ }
+
+ test("test drop statements when reach the threshold ") {
+ val conf = KyuubiConf()
+ conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
+
+ val store = new EngineEventsStore(conf)
+ for (i <- 1 to 5) {
+ val s = SparkStatementEvent("a", s"ea1${i}", "select 1", "app1", "sid1", 1L, "RUNNING", 2L)
+ store.saveStatement(s)
+ }
+
+ assert(store.getStatementList.size == 3)
+ }
+
+ test("test drop statements when reach the threshold, and try to keep active events.") {
+ val conf = KyuubiConf()
+ conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
+
+ val store = new EngineEventsStore(conf)
+
+ store.saveStatement(SparkStatementEvent("a", "s1", "select 1", "a1", "si1", 1L, "RUNNING", -1L))
+ store.saveStatement(SparkStatementEvent("a", "s2", "select 1", "a2", "si1", 2L, "RUNNING", -1L))
+ store.saveStatement(SparkStatementEvent("a", "s3", "1", "a3", "si1", 3L, "ERROR", 3L, 3L))
+ store.saveStatement(SparkStatementEvent("a", "s4", "select 1", "a4", "si1", 4L, "RUNNING", -1L))
+
+ assert(store.getStatementList.size == 3)
+ assert(store.getStatementList(2).statementId == "s4")
+ }
+
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
index 9f759a7..717c9fa 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
@@ -100,5 +100,41 @@ class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils {
}
}
+ test("statement stats for engine tab") {
+ assert(spark.sparkContext.ui.nonEmpty)
+ val client = HttpClients.createDefault()
+ val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
+ val response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ val resp = EntityUtils.toString(response.getEntity)
+ assert(resp.contains("0 session(s) are online,"))
+ withJdbcStatement() { statement =>
+ statement.execute(
+ """
+ |SELECT
+ | l.id % 100 k,
+ | sum(l.id) sum,
+ | count(l.id) cnt,
+ | avg(l.id) avg,
+ | min(l.id) min,
+ | max(l.id) max
+ |from range(0, 100000L, 1, 100) l
+ | left join range(0, 100000L, 2, 100) r ON l.id = r.id
+ |GROUP BY 1""".stripMargin)
+ val response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ val resp = EntityUtils.toString(response.getEntity)
+
+ // check session section
+ assert(resp.contains("SQL Statistics"))
+
+ // check sql stats table id
+ assert(resp.contains("sqlstat"))
+
+ // check sql stats table title
+ assert(resp.contains("Query Execution"))
+ }
+ }
+
override protected def jdbcUrl: String = getJdbcUrl
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 0628549..e035550 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -902,6 +902,14 @@ object KyuubiConf {
.checkValue(_ > 0, "retained sessions must be positive.")
.createWithDefault(200)
+ val ENGINE_UI_STATEMENT_LIMIT: ConfigEntry[Int] =
+ buildConf("engine.ui.retainedStatements")
+ .doc("The number of statements kept in the Kyuubi Query Engine web UI.")
+ .version("1.4.0")
+ .intConf
+ .checkValue(_ > 0, "retained statements must be positive.")
+ .createWithDefault(200)
+
val ENGINE_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
buildConf("engine.operation.log.dir.root")
.doc("Root directory for query operation log at engine-side.")