You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/07/30 04:10:47 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #869] Add Kyuubi Query Engine for Spark UI with basic stats

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c3b722  [KYUUBI #869] Add Kyuubi Query Engine for Spark UI with basic stats
6c3b722 is described below

commit 6c3b722174e0d046ee7701b27249fcf3f4267a92
Author: Kent Yao <ya...@apache.org>
AuthorDate: Fri Jul 30 12:10:36 2021 +0800

    [KYUUBI #869] Add Kyuubi Query Engine for Spark UI with basic stats
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    This PR adds the Kyuubi Query Engine tab on Spark UI, it gathers basic stats currently. We will put more details in the following PRs
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    ![image](https://user-images.githubusercontent.com/8326978/127176837-d0bb6433-c5fc-4402-a2f2-76131332fdf6.png)
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
    
    Closes #869 from yaooqinn/ui.
    
    Closes #869
    
    aaba5f02 [Kent Yao] Merge branch 'master' of github.com:apache/incubator-kyuubi into ui
    0a80a352 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
    ff877fe6 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
    f2e2cabb [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
    7f2b5931 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
    cff28784 [Kent Yao] Add Kyuubi Query Engine for Spark UI with basic stats
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 10 ++--
 .../org/apache/spark/kyuubi/ui/EnginePage.scala    | 65 +++++++++++++++++++++
 .../org/apache/spark/kyuubi/ui/EngineTab.scala     | 31 +++++-----
 .../apache/spark/kyuubi/ui/EngineTabSuite.scala    | 68 ++++++++++++++++++++++
 .../src/main/scala/org/apache/kyuubi/Utils.scala   |  2 +-
 .../apache/kyuubi/operation/OperationManager.scala |  2 +
 .../org/apache/kyuubi/service/Serverable.scala     |  2 +-
 .../org/apache/kyuubi/session/SessionManager.scala |  5 +-
 .../org/apache/kyuubi/service/NoopServer.scala     |  2 +-
 .../org/apache/kyuubi/server/KyuubiServer.scala    |  2 +-
 10 files changed, 161 insertions(+), 28 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index edc4369..c8e6834 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch
 
 import org.apache.spark.SparkConf
 import org.apache.spark.kyuubi.SparkSQLEngineListener
+import org.apache.spark.kyuubi.ui.EngineTab
 import org.apache.spark.sql.SparkSession
 
 import org.apache.kyuubi.Logging
@@ -33,12 +34,8 @@ import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies, Servi
 import org.apache.kyuubi.service.{Serverable, Service}
 import org.apache.kyuubi.util.SignalRegister
 
-private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
-  extends Serverable(name) {
-
-  def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
-
-  override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
+case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
+  override val backendService = new SparkSQLBackendService(spark)
   override protected def supportsServiceDiscovery: Boolean = {
     ServiceDiscovery.supportServiceDiscovery(conf)
   }
@@ -124,6 +121,7 @@ object SparkSQLEngine extends Logging {
     // Stop engine before SparkContext stopped to avoid calling a stopped SparkContext
     addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
     currentEngine = Some(engine)
+    EngineTab(engine)
     engine
   }
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
new file mode 100644
index 0000000..1443d5e
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EnginePage.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi.ui
+
+import java.util.Date
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.UIUtils.formatDurationVerbose
+
+case class EnginePage(parent: EngineTab) extends WebUIPage("") {
+  override def render(request: HttpServletRequest): Seq[Node] = {
+    val content = generateBasicStats() ++
+        <br/> ++
+      <h4>
+        {parent.engine.backendService.sessionManager.getOpenSessionCount} session(s) are online,
+        running {parent.engine.backendService.sessionManager.operationManager.getOperationCount}
+        operations
+      </h4>
+    UIUtils.headerSparkPage(request, parent.name, content, parent)
+  }
+
+  private def generateBasicStats(): Seq[Node] = {
+    val timeSinceStart = System.currentTimeMillis() - parent.engine.getStartTime
+    <ul class ="list-unstyled">
+      <li>
+        <strong>Started at: </strong>
+        {new Date(parent.engine.getStartTime)}
+      </li>
+      <li>
+        <strong>Latest Logout at: </strong>
+        {new Date(parent.engine.backendService.sessionManager.latestLogoutTime)}
+      </li>
+      <li>
+        <strong>Time since start: </strong>
+        {formatDurationVerbose(timeSinceStart)}
+      </li>
+      <li>
+        <strong>Background execution pool threads alive: </strong>
+        {parent.engine.backendService.sessionManager.getExecPoolSize}
+      </li>
+      <li>
+        <strong>Background execution pool threads active: </strong>
+        {parent.engine.backendService.sessionManager.getActiveCount}
+      </li>
+    </ul>
+  }
+}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
similarity index 56%
copy from kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
copy to externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
index c085177..a9d8de9 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineTab.scala
@@ -15,25 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.service
+package org.apache.spark.kyuubi.ui
 
-import org.apache.kyuubi.KyuubiException
+import org.apache.spark.ui.SparkUITab
 
-class NoopServer extends Serverable("noop") {
-  override private[kyuubi] val backendService = new NoopBackendService
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.spark.SparkSQLEngine
 
-  override def start(): Unit = {
-    super.start()
-    if (getConf.getOption("kyuubi.test.server.should.fail").exists(_.toBoolean)) {
-      throw new IllegalArgumentException("should fail")
-    }
-  }
-
-  override protected def stopServer(): Unit = {
-    throw new KyuubiException("no need to stop me")
-  }
+/**
+ * Note that [[SparkUITab]] is private for Spark
+ */
+case class EngineTab(engine: SparkSQLEngine)
+  extends SparkUITab(engine.spark.sparkContext.ui.orNull, "kyuubi") {
 
+  override val name: String = "Kyuubi Query Engine"
 
-  override val discoveryService: Service = backendService
-  override protected val supportsServiceDiscovery: Boolean = false
+  engine.spark.sparkContext.ui.foreach { ui =>
+    this.attachPage(EnginePage(this))
+    ui.attachTab(this)
+    Utils.addShutdownHook(() => ui.detachTab(this))
+  }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
new file mode 100644
index 0000000..3598b3d
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ui/EngineTabSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi.ui
+
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+import org.apache.spark.SparkContext
+
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.JDBCTestUtils
+
+class EngineTabSuite extends WithSparkSQLEngine with JDBCTestUtils {
+  override def withKyuubiConf: Map[String, String] = Map(
+    "spark.ui.enabled" -> "true",
+    "spark.ui.port" -> "0")
+
+  override def beforeAll(): Unit = {
+    SparkContext.getActive.foreach(_.stop())
+    super.beforeAll()
+  }
+
+  test("basic stats for engine tab") {
+    assert(spark.sparkContext.ui.nonEmpty)
+    val client = HttpClients.createDefault()
+    val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
+    val response = client.execute(req)
+    assert(response.getStatusLine.getStatusCode === 200)
+    val resp = EntityUtils.toString(response.getEntity)
+    assert(resp.contains("<strong>Background execution pool threads alive: </strong>"))
+    assert(resp.contains("0 session(s) are online,"))
+    withJdbcStatement() { statement =>
+      statement.execute(
+        """
+          |SELECT
+          |  l.id % 100 k,
+          |  sum(l.id) sum,
+          |  count(l.id) cnt,
+          |  avg(l.id) avg,
+          |  min(l.id) min,
+          |  max(l.id) max
+          |from range(0, 100000L, 1, 100) l
+          |  left join range(0, 100000L, 2, 100) r ON l.id = r.id
+          |GROUP BY 1""".stripMargin)
+      val response = client.execute(req)
+      assert(response.getStatusLine.getStatusCode === 200)
+      val resp = EntityUtils.toString(response.getEntity)
+      assert(resp.contains("1 session(s) are online,"))
+    }
+  }
+
+  override protected def jdbcUrl: String = getJdbcUrl
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 3282fa5..c1860f3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.util.ShutdownHookManager
 
 import org.apache.kyuubi.config.internal.Tests.IS_TESTING
 
-private[kyuubi] object Utils extends Logging {
+object Utils extends Logging {
 
   import org.apache.kyuubi.config.KyuubiConf._
 
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 4fadc1e..7bd7670 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -36,6 +36,8 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
 
   private final val handleToOperation = new java.util.HashMap[OperationHandle, Operation]()
 
+  def getOperationCount: Int = handleToOperation.size()
+
   override def initialize(conf: KyuubiConf): Unit = {
     LogDivertAppender.initialize()
     super.initialize(conf)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
index 4a43264..254338d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala
@@ -26,7 +26,7 @@ abstract class Serverable(name: String) extends CompositeService(name) {
   private val OOMHook = new Runnable { override def run(): Unit = stop() }
   private val started = new AtomicBoolean(false)
 
-  private[kyuubi] val backendService: AbstractBackendService
+  val backendService: AbstractBackendService
   private lazy val frontendService = new FrontendService(backendService, OOMHook)
   protected def supportsServiceDiscovery: Boolean
   val discoveryService: Service
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 6fbd80d..d6a79ef 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -45,6 +45,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
   def latestLogoutTime: Long = _latestLogoutTime
 
   private val handleToSession = new ConcurrentHashMap[SessionHandle, Session]
+
   private val timeoutChecker =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
 
@@ -87,12 +88,12 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
 
   def getOpenSessionCount: Int = handleToSession.size()
 
-  protected def getExecPoolSize: Int = {
+  def getExecPoolSize: Int = {
     assert(execPool != null)
     execPool.getPoolSize
   }
 
-  protected def getActiveCount: Int = {
+  def getActiveCount: Int = {
     assert(execPool != null)
     execPool.getActiveCount
   }
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
index c085177..9049cb8 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.service
 import org.apache.kyuubi.KyuubiException
 
 class NoopServer extends Serverable("noop") {
-  override private[kyuubi] val backendService = new NoopBackendService
+  override val backendService = new NoopBackendService
 
   override def start(): Unit = {
     super.start()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 08905ce..67169a5 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -81,7 +81,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
 
   def this() = this(classOf[KyuubiServer].getSimpleName)
 
-  override private[kyuubi] val backendService: AbstractBackendService = new KyuubiBackendService()
+  override val backendService: AbstractBackendService = new KyuubiBackendService()
   override protected def supportsServiceDiscovery: Boolean = {
     ServiceDiscovery.supportServiceDiscovery(conf)
   }