You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/07/15 05:33:48 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3046][Metrics] Add meter metrics for recording the rate of the operation state for each kyuubi operation

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bb06542a [KYUUBI #3046][Metrics] Add meter metrics for recording the rate of the operation state for each kyuubi operation
4bb06542a is described below

commit 4bb06542a2beeecc1d60b7f62bfa9f9f182fcbff
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Fri Jul 15 13:33:41 2022 +0800

    [KYUUBI #3046][Metrics] Add meter metrics for recording the rate of the operation state for each kyuubi operation
    
    ### _Why are the changes needed?_
    
    Close #3046
    Expose the metrics likes:
    
    ```
    kyuubi.operation.state.BatchJobSubmission.pending
    kyuubi.operation.state.BatchJobSubmission.running
    kyuubi.operation.state.BatchJobSubmission.finished
    kyuubi.operation.state.BatchJobSubmission.error
    kyuubi.operation.state.BatchJobSubmission.canceled
    ```
    So that the kyuubi service admin can know that how many BatchJobSubmission operations are pending, running and so on.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3063 from turboFei/batch_metrics.
    
    Closes #3046
    
    89ee21db [Fei Wang] remove statement overwrite
    25ae41e9 [Fei Wang] update docs
    7287527c [Fei Wang] use meter
    a2dde891 [Fei Wang] save
    a0c6eade [Fei Wang] remove another bug fix
    c250a089 [Fei Wang] add docs
    87f27e89 [Fei Wang] comments
    6bdc4ca9 [Fei Wang] fix flaky test
    b000525d [Fei Wang] fix flaky test
    f748093e [Fei Wang] expose more metrics
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 docs/monitor/metrics.md                            |  1 +
 .../org/apache/kyuubi/metrics/MetricsSystem.scala  |  4 ++
 .../kyuubi/operation/BatchJobSubmission.scala      |  6 +--
 .../apache/kyuubi/operation/KyuubiOperation.scala  |  8 +++-
 .../server/api/v1/BatchesResourceSuite.scala       | 50 +++++++++++++++++++++-
 5 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/docs/monitor/metrics.md b/docs/monitor/metrics.md
index a8bacab09..26c6f98ee 100644
--- a/docs/monitor/metrics.md
+++ b/docs/monitor/metrics.md
@@ -79,6 +79,7 @@ Metrics Prefix | Metrics Suffix | Type | Since | Description
 `kyuubi.backend_service.fetch_result_rows_rate`  | | meter | 1.5.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'> kyuubi backend service `fetchResults` method that fetch result rows rate </div>
 `kyuubi.backend_service.get_primary_keys`  | | meter | 1.6.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'> kyuubi backend service `get_primary_keys` method execution time and rate </div>
 `kyuubi.backend_service.get_cross_reference`  | | meter | 1.6.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'> kyuubi backend service `get_cross_reference` method execution time and rate </div>
+`kyuubi.operation.state` | `${operationType}`<br/>`.${state}` | meter | 1.6.0 |<div style='width: 150pt;word-wrap: break-word;white-space: normal'>  The `${operationType}` with a particular `${state}` rate, e.g. `BatchJobSubmission.pending`, `BatchJobSubmission.finished`. Note that, the terminal states are cumulative, but the intermediate ones are not. </div>
 
 Before v1.5.0, if you use these metrics:
 - `kyuubi.statement.total`
diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
index 41ef03710..2507eb773 100644
--- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
+++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
@@ -117,4 +117,8 @@ object MetricsSystem {
   def counterValue(name: String): Option[Long] = {
     maybeSystem.map(_.registry.counter(name).getCount)
   }
+
+  def meterValue(name: String): Option[Long] = {
+    maybeSystem.map(_.registry.meter(name).getCount)
+  }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 097e0bc19..4f9ba3053 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -66,8 +66,6 @@ class BatchJobSubmission(
   extends KyuubiOperation(session) {
   import BatchJobSubmission._
 
-  override def statement: String = "BATCH_JOB_SUBMISSION"
-
   override def shouldRunAsync: Boolean = true
 
   private val _operationLog = OperationLog.createOperationLog(session, getHandle)
@@ -306,9 +304,7 @@ class BatchJobSubmission(
         case e: IOException => error(e.getMessage, e)
       }
 
-      MetricsSystem.tracing(_.decCount(
-        MetricRegistry.name(OPERATION_OPEN, statement.toLowerCase(Locale.getDefault))))
-
+      MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))
       // fast fail
       if (isTerminalState(state)) {
         killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 9f13980c1..2d28c767e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -38,6 +38,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
   MetricsSystem.tracing { ms =>
     ms.incCount(MetricRegistry.name(OPERATION_OPEN, opType))
     ms.incCount(MetricRegistry.name(OPERATION_TOTAL, opType))
+    ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, state.toString.toLowerCase))
     ms.incCount(MetricRegistry.name(OPERATION_TOTAL))
     ms.markMeter(MetricRegistry.name(OPERATION_STATE, state.toString.toLowerCase))
   }
@@ -156,8 +157,11 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
   override def shouldRunAsync: Boolean = false
 
   override def setState(newState: OperationState): Unit = {
+    MetricsSystem.tracing { ms =>
+      ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, state.toString.toLowerCase), -1)
+      ms.markMeter(MetricRegistry.name(OPERATION_STATE, opType, newState.toString.toLowerCase))
+      ms.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase))
+    }
     super.setState(newState)
-    MetricsSystem.tracing(
-      _.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase)))
   }
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 7fdd7a7e2..48d4e8429 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -34,7 +34,9 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY, APP_URL_KEY}
 import org.apache.kyuubi.engine.spark.{SparkBatchProcessBuilder, SparkProcessBuilder}
-import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
+import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
+import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.server.KyuubiRestFrontendService
 import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
 import org.apache.kyuubi.server.metadata.api.Metadata
@@ -592,4 +594,50 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
     val batchSession = sessionManager.getBatchSessionImpl(SessionHandle.fromUUID(batch.getId))
     assert(batchSession.ipAddress === realClientIp)
   }
+
+  test("expose the metrics with operation type and current state") {
+    assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 0)
+    assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0)
+    assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0)
+    val originalCanceledCounter = getBatchJobSubmissionStateCounter(OperationState.CANCELED)
+
+    val appName = "spark-batch-submission"
+    val requestObj = new BatchRequest(
+      "spark",
+      sparkProcessBuilder.mainResource.get,
+      sparkProcessBuilder.mainClass,
+      appName,
+      Map(
+        "spark.master" -> "local",
+        s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
+        s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000").asJava,
+      Seq.empty[String].asJava)
+
+    val response = webTarget.path("api/v1/batches")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+    assert(200 == response.getStatus)
+    val batch = response.readEntity(classOf[Batch])
+
+    assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) +
+      getBatchJobSubmissionStateCounter(OperationState.PENDING) +
+      getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 1)
+
+    val deleteResp = webTarget.path(s"api/v1/batches/${batch.getId}")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    assert(200 == deleteResp.getStatus)
+
+    assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 0)
+    assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0)
+    assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0)
+    assert(
+      getBatchJobSubmissionStateCounter(OperationState.CANCELED) - originalCanceledCounter === 1)
+  }
+
+  private def getBatchJobSubmissionStateCounter(state: OperationState): Long = {
+    val opType = classOf[BatchJobSubmission].getSimpleName
+    val counterName = s"${MetricsConstants.OPERATION_STATE}.$opType.${state.toString.toLowerCase}"
+    MetricsSystem.meterValue(counterName).getOrElse(0L)
+  }
 }