You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/15 06:49:14 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3170] Expose thrift binary connection metrics
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 23ad7801c [KYUUBI #3170] Expose thrift binary connection metrics
23ad7801c is described below
commit 23ad7801c1d636c98c631e4b61129d67fdc7215a
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Mon Aug 15 14:49:04 2022 +0800
[KYUUBI #3170] Expose thrift binary connection metrics
### _Why are the changes needed?_
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3229 from lightning-L/kyuubi-3170.
Closes #3170
ccb14571 [Tianlin Liao] [KYUUBI #3170] Expose thrift binary connection metrics
Authored-by: Tianlin Liao <ti...@ebay.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kyuubi/service/TBinaryFrontendService.scala | 2 +-
.../apache/kyuubi/metrics/MetricsConstants.scala | 5 ++
.../server/KyuubiTBinaryFrontendService.scala | 31 +++++++-
.../server/KyuubiTBinaryFrontendServiceSuite.scala | 82 ++++++++++++++++++++++
4 files changed, 118 insertions(+), 2 deletions(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
index 6cf36a3c4..c6e218754 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
@@ -46,7 +46,7 @@ abstract class TBinaryFrontendService(name: String)
conf.get(FRONTEND_THRIFT_BINARY_BIND_HOST)
final override protected lazy val portNum: Int = conf.get(FRONTEND_THRIFT_BINARY_BIND_PORT)
- private var server: Option[TServer] = None
+ protected var server: Option[TServer] = None
// Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383
diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
index 0d31de837..342f6e38f 100644
--- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
+++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
@@ -32,6 +32,7 @@ object MetricsConstants {
final private val CONN = KYUUBI + "connection."
final private val THRIFT_HTTP_CONN = KYUUBI + "thrift.http.connection."
+ final private val THRIFT_BINARY_CONN = KYUUBI + "thrift.binary.connection."
final val CONN_OPEN: String = CONN + "opened"
final val CONN_FAIL: String = CONN + "failed"
@@ -41,6 +42,10 @@ object MetricsConstants {
final val THRIFT_HTTP_CONN_FAIL: String = THRIFT_HTTP_CONN + "failed"
final val THRIFT_HTTP_CONN_TOTAL: String = THRIFT_HTTP_CONN + "total"
+ final val THRIFT_BINARY_CONN_OPEN: String = THRIFT_BINARY_CONN + "opened"
+ final val THRIFT_BINARY_CONN_FAIL: String = THRIFT_BINARY_CONN + "failed"
+ final val THRIFT_BINARY_CONN_TOTAL: String = THRIFT_BINARY_CONN + "total"
+
final private val ENGINE = KYUUBI + "engine."
final val ENGINE_FAIL: String = ENGINE + "failed"
final val ENGINE_TIMEOUT: String = ENGINE + "timeout"
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala
index ffc083ac8..069bc63e2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala
@@ -21,13 +21,18 @@ import java.util.Base64
import org.apache.hadoop.conf.Configuration
import org.apache.hive.service.rpc.thrift._
+import org.apache.thrift.protocol.TProtocol
+import org.apache.thrift.server.ServerContext
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.cli.Handle
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.ha.client.{KyuubiServiceDiscovery, ServiceDiscovery}
+import org.apache.kyuubi.metrics.MetricsConstants._
+import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
-import org.apache.kyuubi.service.TFrontendService.{CURRENT_SERVER_CONTEXT, OK_STATUS}
+import org.apache.kyuubi.service.TFrontendService.{CURRENT_SERVER_CONTEXT, FeServiceServerContext, OK_STATUS}
import org.apache.kyuubi.session.KyuubiSessionImpl
final class KyuubiTBinaryFrontendService(
@@ -44,6 +49,30 @@ final class KyuubiTBinaryFrontendService(
}
}
+ override def initialize(conf: KyuubiConf): Unit = synchronized {
+ super.initialize(conf)
+
+ server.foreach(_.setServerEventHandler(new FeTServerEventHandler() {
+ override def createContext(input: TProtocol, output: TProtocol): ServerContext = {
+ MetricsSystem.tracing { ms =>
+ ms.incCount(THRIFT_BINARY_CONN_OPEN)
+ ms.incCount(THRIFT_BINARY_CONN_TOTAL)
+ }
+ new FeServiceServerContext()
+ }
+
+ override def deleteContext(
+ serverContext: ServerContext,
+ input: TProtocol,
+ output: TProtocol): Unit = {
+ super.deleteContext(serverContext, input, output)
+ MetricsSystem.tracing { ms =>
+ ms.decCount(THRIFT_BINARY_CONN_OPEN)
+ }
+ }
+ }))
+ }
+
override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
debug(req.toString)
info("Client protocol version: " + req.getClient_protocol)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendServiceSuite.scala
new file mode 100644
index 000000000..69c10e730
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendServiceSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
+import org.apache.kyuubi.operation.TClientTestUtils
+
+class KyuubiTBinaryFrontendServiceSuite extends WithKyuubiServer with KyuubiFunSuite {
+
+ override protected val conf: KyuubiConf = KyuubiConf()
+
+ test("connection metrics") {
+ val totalConnections =
+ MetricsSystem.counterValue(MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L)
+ val openConnections =
+ MetricsSystem.counterValue(MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L)
+
+ TClientTestUtils.withThriftClient(server.frontendServices.head) {
+ client =>
+ val req = new TOpenSessionReq()
+ req.setUsername(Utils.currentUser)
+ req.setPassword("anonymous")
+ client.OpenSession(req)
+
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections === 1)
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections === 1)
+
+ TClientTestUtils.withThriftClient(server.frontendServices.head) {
+ client =>
+ val req = new TOpenSessionReq()
+ req.setUsername(Utils.currentUser)
+ req.setPassword("anonymous")
+ client.OpenSession(req)
+
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections
+ === 2)
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections
+ === 2)
+ }
+
+ eventually(timeout(1.seconds), interval(200.milliseconds)) {
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections
+ === 2)
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections === 1)
+ }
+ }
+
+ eventually(timeout(1.seconds), interval(200.milliseconds)) {
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections
+ === 2)
+ assert(MetricsSystem.counterValue(
+ MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections === 0)
+ }
+ }
+}