You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2021/12/20 02:00:12 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1586] Add time metric on each KyuubiBackendService method

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f3dc1fd  [KYUUBI #1586] Add time metric on each KyuubiBackendService method
f3dc1fd is described below

commit f3dc1fdecd6ef19d75898bffc9f159f5d09fd368
Author: zhenjiaguo <zh...@163.com>
AuthorDate: Mon Dec 20 09:59:57 2021 +0800

    [KYUUBI #1586] Add time metric on each KyuubiBackendService method
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Add time metric on each` KyuubiBackendService` method can help us inspect Kyuubi Server running status inside. It can Indirect reflecting our RPC call time when using Kyuubi.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    ![Screenshot from 2021-12-19 12-26-39](https://user-images.githubusercontent.com/29809822/146663963-7e483f45-198c-4fd7-b039-211dcf26dde5.png)
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1588 from zhenjiaguo/add_be_method_metric.
    
    Closes #1586
    
    a001eb96 [zhenjiaguo] Add time metric on each KyuubiBackendService method
    
    Authored-by: zhenjiaguo <zh...@163.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../apache/kyuubi/metrics/MetricsConstants.scala   |  18 +++
 .../org/apache/kyuubi/metrics/MetricsSystem.scala  |   5 +
 .../kyuubi/server/BackendServiceTimeMetric.scala   | 169 +++++++++++++++++++++
 .../org/apache/kyuubi/server/KyuubiServer.scala    |   3 +-
 .../server/BackendServiceTimeMetricSuite.scala     |  73 +++++++++
 5 files changed, 267 insertions(+), 1 deletion(-)

diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
index 9a73ccc..2fcb102 100644
--- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
+++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
@@ -40,4 +40,22 @@ object MetricsConstants {
   final val OPERATION_FAIL: String = OPERATION + "failed"
   final val OPERATION_TOTAL: String = OPERATION + "total"
 
+  final private val BACKEND_SERVICE = KYUUBI + "backend_service."
+  final val OPEN_SESSION_MS = BACKEND_SERVICE + "open_session_ms"
+  final val CLOSE_SESSION_MS = BACKEND_SERVICE + "close_session_ms"
+  final val GET_INFO_MS = BACKEND_SERVICE + "get_info_ms"
+  final val EXECUTE_STATEMENT_MS = BACKEND_SERVICE + "execute_statement_ms"
+  final val GET_TYPE_INFO_MS = BACKEND_SERVICE + "get_type_info_ms"
+  final val GET_CATALOGS_MS = BACKEND_SERVICE + "get_catalogs_ms"
+  final val GET_SCHEMAS_MS = BACKEND_SERVICE + "get_schemas_ms"
+  final val GET_TABLES_MS = BACKEND_SERVICE + "get_tables_ms"
+  final val GET_TABLE_TYPES_MS = BACKEND_SERVICE + "get_table_types_ms"
+  final val GET_COLUMNS_MS = BACKEND_SERVICE + "get_columns_ms"
+  final val GET_FUNCTIONS_MS = BACKEND_SERVICE + "get_functions_ms"
+  final val GET_OPERATION_STATUS_MS = BACKEND_SERVICE + "get_operation_status_ms"
+  final val CANCEL_OPERATION_MS = BACKEND_SERVICE + "cancel_operation_ms"
+  final val CLOSE_OPERATION_MS = BACKEND_SERVICE + "close_operation_ms"
+  final val GET_RESULT_SET_METADATA_MS = BACKEND_SERVICE + "get_result_set_metadata_ms"
+  final val FETCH_RESULTS_MS = BACKEND_SERVICE + "fetch_results_ms"
+
 }
diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
index a6200a3..ec7a142 100644
--- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
+++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
@@ -42,6 +42,11 @@ class MetricsSystem extends CompositeService("MetricsSystem") {
     counter.dec(1L)
   }
 
+  def updateHistogram(key: String, value: Long): Unit = {
+    val histogram = registry.histogram(key)
+    histogram.update(value)
+  }
+
   def registerGauge[T](name: String, value: => T, default: T): Unit = {
     registry.register(
       MetricRegistry.name(name),
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala
new file mode 100644
index 0000000..76c9394
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
+import org.apache.kyuubi.operation.{OperationHandle, OperationStatus}
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.service.BackendService
+import org.apache.kyuubi.session.SessionHandle
+
+trait BackendServiceTimeMetric extends BackendService {
+
+  @throws[Exception]
+  private def timeMetric[T](name: String)(f: => T): T = {
+    val startTime = System.currentTimeMillis()
+    try {
+      f
+    } finally {
+      MetricsSystem.tracing(
+        _.updateHistogram(name, System.currentTimeMillis() - startTime))
+    }
+  }
+
+  abstract override def openSession(
+      protocol: TProtocolVersion,
+      user: String,
+      password: String,
+      ipAddr: String,
+      configs: Map[String, String]): SessionHandle = {
+    timeMetric(MetricsConstants.OPEN_SESSION_MS) {
+      super.openSession(protocol, user, password, ipAddr, configs)
+    }
+  }
+
+  abstract override def closeSession(sessionHandle: SessionHandle): Unit = {
+    timeMetric(MetricsConstants.CLOSE_SESSION_MS) {
+      super.closeSession(sessionHandle)
+    }
+  }
+
+  abstract override def getInfo(
+      sessionHandle: SessionHandle,
+      infoType: TGetInfoType): TGetInfoValue = {
+    timeMetric(MetricsConstants.GET_INFO_MS) {
+      super.getInfo(sessionHandle, infoType)
+    }
+  }
+
+  abstract override def executeStatement(
+      sessionHandle: SessionHandle,
+      statement: String,
+      runAsync: Boolean,
+      queryTimeout: Long): OperationHandle = {
+    timeMetric(MetricsConstants.EXECUTE_STATEMENT_MS) {
+      super.executeStatement(sessionHandle, statement, runAsync, queryTimeout)
+    }
+  }
+
+  abstract override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
+    timeMetric(MetricsConstants.GET_TYPE_INFO_MS) {
+      super.getTypeInfo(sessionHandle)
+    }
+  }
+
+  abstract override def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
+    timeMetric(MetricsConstants.GET_CATALOGS_MS) {
+      super.getCatalogs(sessionHandle)
+    }
+  }
+
+  abstract override def getSchemas(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String): OperationHandle = {
+    timeMetric(MetricsConstants.GET_SCHEMAS_MS) {
+      super.getSchemas(sessionHandle, catalogName, schemaName)
+    }
+  }
+
+  abstract override def getTables(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      tableTypes: java.util.List[String]): OperationHandle = {
+    timeMetric(MetricsConstants.GET_TABLES_MS) {
+      super.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes)
+    }
+  }
+
+  abstract override def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
+    timeMetric(MetricsConstants.GET_TABLE_TYPES_MS) {
+      super.getTableTypes(sessionHandle)
+    }
+  }
+
+  abstract override def getColumns(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      columnName: String): OperationHandle = {
+    timeMetric(MetricsConstants.GET_COLUMNS_MS) {
+      super.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName)
+    }
+  }
+
+  abstract override def getFunctions(
+      sessionHandle: SessionHandle,
+      catalogName: String,
+      schemaName: String,
+      functionName: String): OperationHandle = {
+    timeMetric(MetricsConstants.GET_FUNCTIONS_MS) {
+      super.getFunctions(sessionHandle, catalogName, schemaName, functionName)
+    }
+  }
+
+  abstract override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
+    timeMetric(MetricsConstants.GET_OPERATION_STATUS_MS) {
+      super.getOperationStatus(operationHandle)
+    }
+  }
+
+  abstract override def cancelOperation(operationHandle: OperationHandle): Unit = {
+    timeMetric(MetricsConstants.CANCEL_OPERATION_MS) {
+      super.cancelOperation(operationHandle)
+    }
+  }
+
+  abstract override def closeOperation(operationHandle: OperationHandle): Unit = {
+    timeMetric(MetricsConstants.CLOSE_OPERATION_MS) {
+      super.closeOperation(operationHandle)
+    }
+  }
+
+  abstract override def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema = {
+    timeMetric(MetricsConstants.GET_RESULT_SET_METADATA_MS) {
+      super.getResultSetMetadata(operationHandle)
+    }
+  }
+
+  abstract override def fetchResults(
+      operationHandle: OperationHandle,
+      orientation: FetchOrientation,
+      maxRows: Int,
+      fetchLog: Boolean): TRowSet = {
+    timeMetric(MetricsConstants.FETCH_RESULTS_MS) {
+      super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
+    }
+  }
+
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 3196f69..e6e6f42 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -135,7 +135,8 @@ class KyuubiServer(name: String) extends Serverable(name) {
 
   def this() = this(classOf[KyuubiServer].getSimpleName)
 
-  override val backendService: AbstractBackendService = new KyuubiBackendService()
+  override val backendService: AbstractBackendService =
+    new KyuubiBackendService() with BackendServiceTimeMetric
 
   override lazy val frontendServices: Seq[AbstractFrontendService] =
     conf.get(FRONTEND_PROTOCOLS).map(FrontendProtocols.withName).map {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala
new file mode 100644
index 0000000..d25c266
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/BackendServiceTimeMetricSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import java.nio.file.{Path, Paths}
+import java.time.Duration
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.metrics.MetricsConf
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class BackendServiceTimeMetricSuite extends WithKyuubiServer with HiveJDBCTestHelper {
+
+  override protected def jdbcUrl: String = getJdbcUrl
+
+  val reportPath: Path = Utils.createTempDir()
+  override protected val conf: KyuubiConf = {
+    KyuubiConf()
+      .set(MetricsConf.METRICS_REPORTERS, Seq("JSON"))
+      .set(MetricsConf.METRICS_JSON_LOCATION, reportPath.toString)
+      .set(MetricsConf.METRICS_JSON_INTERVAL, Duration.ofMillis(100).toMillis)
+  }
+
+  test("backend service method time metric test") {
+    val objMapper = new ObjectMapper()
+
+    withJdbcStatement() { statement =>
+      statement.execute("show databases")
+      Thread.sleep(Duration.ofMillis(111).toMillis)
+
+      val res1 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
+      assert(res1.has("histograms"))
+      val histograms1 = res1.get("histograms")
+      assert(
+        histograms1.get("kyuubi.backend_service.execute_statement_ms").get("count").asInt() == 1)
+      assert(
+        histograms1.get("kyuubi.backend_service.execute_statement_ms").get("mean").asDouble() > 0)
+
+      statement.execute("show tables")
+      Thread.sleep(Duration.ofMillis(111).toMillis)
+
+      val res2 = objMapper.readTree(Paths.get(reportPath.toString, "report.json").toFile)
+      val histograms2 = res2.get("histograms")
+      assert(
+        histograms2.get("kyuubi.backend_service.open_session_ms").get("count").asInt() == 1)
+      assert(
+        histograms2.get("kyuubi.backend_service.open_session_ms").get("min").asInt() > 0)
+      val execStatementNode2 = histograms2.get("kyuubi.backend_service.execute_statement_ms")
+      assert(execStatementNode2.get("count").asInt() == 2)
+      assert(
+        execStatementNode2.get("max").asDouble() >= execStatementNode2.get("mean").asDouble() &&
+          execStatementNode2.get("mean").asDouble() >= execStatementNode2.get("min").asDouble())
+    }
+  }
+}