You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/08 09:14:16 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1393][TASK-2] Add operation number metric

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7beee52  [KYUUBI #1393][TASK-2] Add operation number metric
7beee52 is described below

commit 7beee524489dab230e26eaf5c0a5a3b96c573b2d
Author: zhenjiaguo <zh...@163.com>
AuthorDate: Wed Dec 8 17:13:30 2021 +0800

    [KYUUBI #1393][TASK-2] Add operation number metric
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Current connection number is already had. I add current operation number.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1485 from zhenjiaguo/add-operation-number.
    
    Closes #1393
    
    beccdfed [zhenjiaguo] when cancel we need decCount the open operation
    773d01ee [zhenjiaguo] when cancel we need decCount the open operation
    59001995 [zhenjiaguo] change variable name
    83f64fb8 [zhenjiaguo] unify operation metric
    daf0b6a7 [zhenjiaguo] add operation number
    
    Authored-by: zhenjiaguo <zh...@163.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../org/apache/kyuubi/operation/OperationType.scala  |  2 +-
 .../org/apache/kyuubi/metrics/MetricsConstants.scala | 11 ++++++-----
 .../apache/kyuubi/metrics/MetricsSystemSuite.scala   |  6 +++---
 .../apache/kyuubi/operation/ExecuteStatement.scala   |  7 -------
 .../apache/kyuubi/operation/KyuubiOperation.scala    | 20 ++++++++++++++++----
 .../kyuubi/operation/KyuubiOperationManager.scala    |  7 +++++++
 6 files changed, 33 insertions(+), 20 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
index 482d5d6..e3f67d3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
@@ -23,7 +23,7 @@ object OperationType extends Enumeration {
 
   type OperationType = Value
   val UNKNOWN_OPERATION, EXECUTE_STATEMENT, GET_TYPE_INFO, GET_CATALOGS, GET_SCHEMAS, GET_TABLES,
-      GET_TABLE_TYPES, GET_COLUMNS, GET_FUNCTIONS, LAUNCH_ENGINE = Value
+      GET_TABLE_TYPES, GET_COLUMNS, GET_FUNCTIONS = Value
 
   def getOperationType(from: TOperationType): OperationType = {
     from match {
diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
index e3214cb..9a73ccc 100644
--- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
+++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
@@ -30,13 +30,14 @@ object MetricsConstants {
   final val CONN_FAIL: String = CONN + "failed"
   final val CONN_TOTAL: String = CONN + "total"
 
-  final private val STATEMENT = KYUUBI + "statement."
-  final val STATEMENT_OPEN: String = STATEMENT + "opened"
-  final val STATEMENT_FAIL: String = STATEMENT + "failed"
-  final val STATEMENT_TOTAL: String = STATEMENT + "total"
-
   final private val ENGINE = KYUUBI + "engine."
   final val ENGINE_FAIL: String = ENGINE + "failed"
   final val ENGINE_TIMEOUT: String = ENGINE + "timeout"
   final val ENGINE_TOTAL: String = ENGINE + "total"
+
+  final private val OPERATION = KYUUBI + "operation."
+  final val OPERATION_OPEN: String = OPERATION + "opened"
+  final val OPERATION_FAIL: String = OPERATION + "failed"
+  final val OPERATION_TOTAL: String = OPERATION + "total"
+
 }
diff --git a/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala b/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala
index e049d97..611531d 100644
--- a/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala
+++ b/kyuubi-metrics/src/test/scala/org/apache/kyuubi/metrics/MetricsSystemSuite.scala
@@ -85,10 +85,10 @@ class MetricsSystemSuite extends KyuubiFunSuite {
     metricsSystem.start()
     val reportFile = Paths.get(reportPath.toString, "report.json")
     checkJsonFileMetrics(reportFile, "heap.usage")
-    metricsSystem.incCount(MetricsConstants.STATEMENT_TOTAL)
+    metricsSystem.incCount(MetricsConstants.OPERATION_TOTAL)
 
-    checkJsonFileMetrics(reportFile, MetricsConstants.STATEMENT_TOTAL)
-    metricsSystem.decCount(MetricsConstants.STATEMENT_TOTAL)
+    checkJsonFileMetrics(reportFile, MetricsConstants.OPERATION_TOTAL)
+    metricsSystem.decCount(MetricsConstants.OPERATION_TOTAL)
     metricsSystem.registerGauge(MetricsConstants.CONN_OPEN, 20181117, 0)
     checkJsonFileMetrics(reportFile, MetricsConstants.CONN_OPEN)
     checkJsonFileMetrics(reportFile, "20181117")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index df4d282..398056b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -26,8 +26,6 @@ import org.apache.thrift.TException
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.events.KyuubiStatementEvent
-import org.apache.kyuubi.metrics.MetricsConstants._
-import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
@@ -67,10 +65,6 @@ class ExecuteStatement(
 
   private def executeStatement(): Unit = {
     try {
-      MetricsSystem.tracing { ms =>
-        ms.incCount(STATEMENT_OPEN)
-        ms.incCount(STATEMENT_TOTAL)
-      }
       // We need to avoid executing query in sync mode, because there is no heartbeat mechanism
       // in thrift protocol, in sync mode, we cannot distinguish between long-run query and
       // engine crash without response before socket read timeout.
@@ -183,7 +177,6 @@ class ExecuteStatement(
   }
 
   override def close(): Unit = {
-    MetricsSystem.tracing(_.decCount(STATEMENT_OPEN))
     super.close()
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index a02a6bc..19f4c4b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -26,7 +26,7 @@ import org.apache.thrift.TException
 import org.apache.thrift.transport.TTransportException
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
-import org.apache.kyuubi.metrics.MetricsConstants.STATEMENT_FAIL
+import org.apache.kyuubi.metrics.MetricsConstants.{OPERATION_FAIL, OPERATION_OPEN, OPERATION_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationType.OperationType
@@ -36,6 +36,17 @@ import org.apache.kyuubi.util.ThriftUtils
 abstract class KyuubiOperation(opType: OperationType, session: Session)
   extends AbstractOperation(opType, session) {
 
+  private val opTypeName = (opType match {
+    case OperationType.UNKNOWN_OPERATION => statement
+    case _ => opType.toString
+  }).toLowerCase
+
+  MetricsSystem.tracing { ms =>
+    ms.incCount(MetricRegistry.name(OPERATION_OPEN, opTypeName))
+    ms.incCount(MetricRegistry.name(OPERATION_TOTAL, opTypeName))
+    ms.incCount(MetricRegistry.name(OPERATION_TOTAL))
+  }
+
   protected[operation] lazy val client = session.asInstanceOf[KyuubiSessionImpl].client
 
   @volatile protected var _remoteOpHandle: TOperationHandle = _
@@ -53,9 +64,8 @@ abstract class KyuubiOperation(opType: OperationType, session: Session)
           warn(s"Ignore exception in terminal state with $statementId: $e")
         } else {
           val errorType = e.getClass.getSimpleName
-          MetricsSystem.tracing {
-            _.incCount(MetricRegistry.name(STATEMENT_FAIL, errorType))
-          }
+          MetricsSystem.tracing(_.incCount(
+            MetricRegistry.name(OPERATION_FAIL, opTypeName, errorType)))
           val ke = e match {
             case kse: KyuubiSQLException => kse
             case te: TTransportException
@@ -91,6 +101,7 @@ abstract class KyuubiOperation(opType: OperationType, session: Session)
   override def cancel(): Unit = state.synchronized {
     if (!isClosedOrCanceled) {
       setState(OperationState.CANCELED)
+      MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opTypeName)))
       if (_remoteOpHandle != null) {
         try {
           client.cancelOperation(_remoteOpHandle)
@@ -105,6 +116,7 @@ abstract class KyuubiOperation(opType: OperationType, session: Session)
   override def close(): Unit = state.synchronized {
     if (!isClosedOrCanceled) {
       setState(OperationState.CLOSED)
+      MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opTypeName)))
       if (_remoteOpHandle != null) {
         try {
           getOperationLog.foreach(_.close())
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index a606eb2..ba3267d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -23,6 +23,8 @@ import org.apache.hive.service.rpc.thrift.TRowSet
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
+import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
+import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
 import org.apache.kyuubi.util.ThriftUtils
@@ -135,4 +137,9 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
         }
     }
   }
+
+  override def start(): Unit = synchronized {
+    MetricsSystem.tracing(_.registerGauge(OPERATION_OPEN, getOperationCount, 0))
+    super.start()
+  }
 }