You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/07/30 04:10:47 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #869] Add Kyuubi
Query Engine for Spark UI with basic stats
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 6c3b722 [KYUUBI #869] Add Kyuubi Query Engine for Spark UI with basic stats
6c3b722 is described below
commit 6c3b722174e0d046ee7701b27249fcf3f4267a92
Author: Kent Yao <ya...@apache.org>
AuthorDate: Fri Jul 30 12:10:36 2021 +0800
[KYUUBI #869] Add Kyuubi Query Engine for Spark UI with basic stats
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
This PR adds the Kyuubi Query Engine tab on Spark UI, it gathers basic stats currently. We will put more details in the following PRs
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
![image](https://user-images.githubusercontent.com/8326978/127176837-d0bb6433-c5fc-4402-a2f2-76131332fdf6.png)
- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
Closes #869 from yaooqinn/ui.
Closes #869
aaba5f02 [Kent Yao] Merge branch 'master' of github.com:apache/incubator-kyuubi into ui
0a80a352 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
ff877fe6 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
f2e2cabb [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
7f2b5931 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
cff28784 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 10 ++--
.../org/apache/spark/kyuubi/ui/EnginePage.scala | 65 +++++++++++++++++++++
.../org/apache/spark/kyuubi/ui/EngineTab.scala | 31 +++++-----
.../apache/spark/kyuubi/ui/EngineTabSuite.scala | 68 ++++++++++++++++++++++
.../src/main/scala/org/apache/kyuubi/Utils.scala | 2 +-
.../apache/kyuubi/operation/OperationManager.scala | 2 +
.../org/apache/kyuubi/service/Serverable.scala | 2 +-
.../org/apache/kyuubi/session/SessionManager.scala | 5 +-
.../org/apache/kyuubi/service/NoopServer.scala | 2 +-
.../org/apache/kyuubi/server/KyuubiServer.scala | 2 +-
10 files changed, 161 insertions(+), 28 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index edc4369..c8e6834 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch
import org.apache.spark.SparkConf
import org.apache.spark.kyuubi.SparkSQLEngineListener
+import org.apache.spark.kyuubi.ui.EngineTab
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.Logging
@@ -33,12 +34,8 @@ import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, Servi
import org.apache.kyuubi.service.{Serverable, Service}
import org.apache.kyuubi.util.SignalRegister
-private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
- extends Serverable(name) {
-
- def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
-
- override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
+case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
+ override val backendService = new SparkSQLBackendService(spark)
override protected def supportsServiceDiscovery: Boolean = {
ServiceDiscovery.supportServiceDiscovery(conf)
}
@@ -124,6 +121,7 @@ object SparkSQLEngine extends Logging {
// Stop engine before SparkContext stopped to avoid calling a stopped SparkContext
addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
currentEngine = Some(engine)
+ EngineTab(engine)
engine
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
new file mode 100644
index 0000000..1443d5e
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi.ui
+
+import java.util.Date
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.UIUtils.formatDurationVerbose
+
+case class EnginePage(parent: EngineTab) extends WebUIPage("") {
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val content = generateBasicStats() ++
+ <br/> ++
+ <h4>
+ {parent.engine.backendService.sessionManager.getOpenSessionCount} session(s) are online,
+ running {parent.engine.backendService.sessionManager.operationManager.getOperationCount}
+ operations
+ </h4>
+ UIUtils.headerSparkPage(request, parent.name, content, parent)
+ }
+
+ private def generateBasicStats(): Seq[Node] = {
+ val timeSinceStart = System.currentTimeMillis() - parent.engine.getStartTime
+ <ul class ="list-unstyled">
+ <li>
+ <strong>Started at: </strong>
+ {new Date(parent.engine.getStartTime)}
+ </li>
+ <li>
+ <strong>Latest Logout at: </strong>
+ {new Date(parent.engine.backendService.sessionManager.latestLogoutTime)}
+ </li>
+ <li>
+ <strong>Time since start: </strong>
+ {formatDurationVerbose(timeSinceStart)}
+ </li>
+ <li>
+ <strong>Background execution pool threads alive: </strong>
+ {parent.engine.backendService.sessionManager.getExecPoolSize}
+ </li>
+ <li>
+ <strong>Background execution pool threads active: </strong>
+ {parent.engine.backendService.sessionManager.getActiveCount}
+ </li>
+ </ul>
+ }
+}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
similarity index 56%
copy from kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
copy to externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
index c085177..a9d8de9 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
@@ -15,25 +15,24 @@
* limitations under the License.
*/
-package org.apache.kyuubi.service
+package org.apache.spark.kyuubi.ui
-import org.apache.kyuubi.KyuubiException
+import org.apache.spark.ui.SparkUITab
-class NoopServer extends Serverable("noop") {
- override private[kyuubi] val backendService = new NoopBackendService
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.spark.SparkSQLEngine
- override def start(): Unit = {
- super.start()
- if (getConf.getOption("kyuubi.test.server.should.fail").exists(_.toBoolean)) {
- throw new IllegalArgumentException("should fail")
- }
- }
-
- override protected def stopServer(): Unit = {
- throw new KyuubiException("no need to stop me")
- }
+/**
+ * Note that [[SparkUITab]] is private for Spark
+ */
+case class EngineTab(engine: SparkSQLEngine)
+ extends SparkUITab(engine.spark.sparkContext.ui.orNull, "kyuubi") {
+ override val name: String = "Kyuubi Query Engine"
- override val discoveryService: Service = backendService
- override protected val supportsServiceDiscovery: Boolean = false
+ engine.spark.sparkContext.ui.foreach { ui =>
+ this.attachPage(EnginePage(this))
+ ui.attachTab(this)
+ Utils.addShutdownHook(() => ui.detachTab(this))
+ }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
new file mode 100644
index 0000000..3598b3d
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi.ui
+
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+import org.apache.spark.SparkContext
+
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.JDBCTestUtils
+
+class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils {
+ override def withKyuubiConf: Map[String, String] = Map(
+ "spark.ui.enabled" -> "true",
+ "spark.ui.port" -> "0")
+
+ override def beforeAll(): Unit = {
+ SparkContext.getActive.foreach(_.stop())
+ super.beforeAll()
+ }
+
+ test("basic stats for engine tab") {
+ assert(spark.sparkContext.ui.nonEmpty)
+ val client = HttpClients.createDefault()
+ val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
+ val response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ val resp = EntityUtils.toString(response.getEntity)
+ assert(resp.contains("<strong>Background execution pool threads alive: </strong>"))
+ assert(resp.contains("0 session(s) are online,"))
+ withJdbcStatement() { statement =>
+ statement.execute(
+ """
+ |SELECT
+ | l.id % 100 k,
+ | sum(l.id) sum,
+ | count(l.id) cnt,
+ | avg(l.id) avg,
+ | min(l.id) min,
+ | max(l.id) max
+ |from range(0, 100000L, 1, 100) l
+ | left join range(0, 100000L, 2, 100) r ON l.id = r.id
+ |GROUP BY 1""".stripMargin)
+ val response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ val resp = EntityUtils.toString(response.getEntity)
+ assert(resp.contains("1 session(s) are online,"))
+ }
+ }
+
+ override protected def jdbcUrl: String = getJdbcUrl
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 3282fa5..c1860f3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.util.ShutdownHookManager
import org.apache.kyuubi.config.internal.Tests.IS_TESTING
-private[kyuubi] object Utils extends Logging {
+object Utils extends Logging {
import org.apache.kyuubi.config.KyuubiConf._
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 4fadc1e..7bd7670 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -36,6 +36,8 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
private final val handleToOperation = new java.util.HashMap[OperationHandle, Operation]()
+ def getOperationCount: Int = handleToOperation.size()
+
override def initialize(conf: KyuubiConf): Unit = {
LogDivertAppender.initialize()
super.initialize(conf)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
index 4a43264..254338d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
@@ -26,7 +26,7 @@ abstract class Serverable(name: String) extends CompositeService(name) {
private val OOMHook = new Runnable { override def run(): Unit = stop() }
private val started = new AtomicBoolean(false)
- private[kyuubi] val backendService: AbstractBackendService
+ val backendService: AbstractBackendService
private lazy val frontendService = new FrontendService(backendService, OOMHook)
protected def supportsServiceDiscovery: Boolean
val discoveryService: Service
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 6fbd80d..d6a79ef 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -45,6 +45,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
def latestLogoutTime: Long = _latestLogoutTime
private val handleToSession = new ConcurrentHashMap[SessionHandle, Session]
+
private val timeoutChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
@@ -87,12 +88,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
def getOpenSessionCount: Int = handleToSession.size()
- protected def getExecPoolSize: Int = {
+ def getExecPoolSize: Int = {
assert(execPool != null)
execPool.getPoolSize
}
- protected def getActiveCount: Int = {
+ def getActiveCount: Int = {
assert(execPool != null)
execPool.getActiveCount
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
index c085177..9049cb8 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.service
import org.apache.kyuubi.KyuubiException
class NoopServer extends Serverable("noop") {
- override private[kyuubi] val backendService = new NoopBackendService
+ override val backendService = new NoopBackendService
override def start(): Unit = {
super.start()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 08905ce..67169a5 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -81,7 +81,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
def this() = this(classOf[KyuubiServer].getSimpleName)
- override private[kyuubi] val backendService: AbstractBackendService = new KyuubiBackendService()
+ override val backendService: AbstractBackendService = new KyuubiBackendService()
override protected def supportsServiceDiscovery: Boolean = {
ServiceDiscovery.supportServiceDiscovery(conf)
}