You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/15 06:49:14 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3170] Expose thrift binary connection metrics

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 23ad7801c [KYUUBI #3170] Expose thrift binary connection metrics
23ad7801c is described below

commit 23ad7801c1d636c98c631e4b61129d67fdc7215a
Author: Tianlin Liao <ti...@ebay.com>
AuthorDate: Mon Aug 15 14:49:04 2022 +0800

    [KYUUBI #3170] Expose thrift binary connection metrics
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3229 from lightning-L/kyuubi-3170.
    
    Closes #3170
    
    ccb14571 [Tianlin Liao] [KYUUBI #3170] Expose thrift binary connection metrics
    
    Authored-by: Tianlin Liao <ti...@ebay.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/service/TBinaryFrontendService.scala    |  2 +-
 .../apache/kyuubi/metrics/MetricsConstants.scala   |  5 ++
 .../server/KyuubiTBinaryFrontendService.scala      | 31 +++++++-
 .../server/KyuubiTBinaryFrontendServiceSuite.scala | 82 ++++++++++++++++++++++
 4 files changed, 118 insertions(+), 2 deletions(-)

diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
index 6cf36a3c4..c6e218754 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
@@ -46,7 +46,7 @@ abstract class TBinaryFrontendService(name: String)
     conf.get(FRONTEND_THRIFT_BINARY_BIND_HOST)
   final override protected lazy val portNum: Int = conf.get(FRONTEND_THRIFT_BINARY_BIND_PORT)
 
-  private var server: Option[TServer] = None
+  protected var server: Option[TServer] = None
 
   // Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383
 
diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
index 0d31de837..342f6e38f 100644
--- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
+++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
@@ -32,6 +32,7 @@ object MetricsConstants {
 
   final private val CONN = KYUUBI + "connection."
   final private val THRIFT_HTTP_CONN = KYUUBI + "thrift.http.connection."
+  final private val THRIFT_BINARY_CONN = KYUUBI + "thrift.binary.connection."
 
   final val CONN_OPEN: String = CONN + "opened"
   final val CONN_FAIL: String = CONN + "failed"
@@ -41,6 +42,10 @@ object MetricsConstants {
   final val THRIFT_HTTP_CONN_FAIL: String = THRIFT_HTTP_CONN + "failed"
   final val THRIFT_HTTP_CONN_TOTAL: String = THRIFT_HTTP_CONN + "total"
 
+  final val THRIFT_BINARY_CONN_OPEN: String = THRIFT_BINARY_CONN + "opened"
+  final val THRIFT_BINARY_CONN_FAIL: String = THRIFT_BINARY_CONN + "failed"
+  final val THRIFT_BINARY_CONN_TOTAL: String = THRIFT_BINARY_CONN + "total"
+
   final private val ENGINE = KYUUBI + "engine."
   final val ENGINE_FAIL: String = ENGINE + "failed"
   final val ENGINE_TIMEOUT: String = ENGINE + "timeout"
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala
index ffc083ac8..069bc63e2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendService.scala
@@ -21,13 +21,18 @@ import java.util.Base64
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hive.service.rpc.thrift._
+import org.apache.thrift.protocol.TProtocol
+import org.apache.thrift.server.ServerContext
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.cli.Handle
+import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiReservedKeys._
 import org.apache.kyuubi.ha.client.{KyuubiServiceDiscovery, ServiceDiscovery}
+import org.apache.kyuubi.metrics.MetricsConstants._
+import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
-import org.apache.kyuubi.service.TFrontendService.{CURRENT_SERVER_CONTEXT, OK_STATUS}
+import org.apache.kyuubi.service.TFrontendService.{CURRENT_SERVER_CONTEXT, FeServiceServerContext, OK_STATUS}
 import org.apache.kyuubi.session.KyuubiSessionImpl
 
 final class KyuubiTBinaryFrontendService(
@@ -44,6 +49,30 @@ final class KyuubiTBinaryFrontendService(
     }
   }
 
+  override def initialize(conf: KyuubiConf): Unit = synchronized {
+    super.initialize(conf)
+
+    server.foreach(_.setServerEventHandler(new FeTServerEventHandler() {
+      override def createContext(input: TProtocol, output: TProtocol): ServerContext = {
+        MetricsSystem.tracing { ms =>
+          ms.incCount(THRIFT_BINARY_CONN_OPEN)
+          ms.incCount(THRIFT_BINARY_CONN_TOTAL)
+        }
+        new FeServiceServerContext()
+      }
+
+      override def deleteContext(
+          serverContext: ServerContext,
+          input: TProtocol,
+          output: TProtocol): Unit = {
+        super.deleteContext(serverContext, input, output)
+        MetricsSystem.tracing { ms =>
+          ms.decCount(THRIFT_BINARY_CONN_OPEN)
+        }
+      }
+    }))
+  }
+
   override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
     debug(req.toString)
     info("Client protocol version: " + req.getClient_protocol)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendServiceSuite.scala
new file mode 100644
index 000000000..69c10e730
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiTBinaryFrontendServiceSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import org.apache.hive.service.rpc.thrift.TOpenSessionReq
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
+import org.apache.kyuubi.operation.TClientTestUtils
+
+class KyuubiTBinaryFrontendServiceSuite extends WithKyuubiServer with KyuubiFunSuite {
+
+  override protected val conf: KyuubiConf = KyuubiConf()
+
+  test("connection metrics") {
+    val totalConnections =
+      MetricsSystem.counterValue(MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L)
+    val openConnections =
+      MetricsSystem.counterValue(MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L)
+
+    TClientTestUtils.withThriftClient(server.frontendServices.head) {
+      client =>
+        val req = new TOpenSessionReq()
+        req.setUsername(Utils.currentUser)
+        req.setPassword("anonymous")
+        client.OpenSession(req)
+
+        assert(MetricsSystem.counterValue(
+          MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections === 1)
+        assert(MetricsSystem.counterValue(
+          MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections === 1)
+
+        TClientTestUtils.withThriftClient(server.frontendServices.head) {
+          client =>
+            val req = new TOpenSessionReq()
+            req.setUsername(Utils.currentUser)
+            req.setPassword("anonymous")
+            client.OpenSession(req)
+
+            assert(MetricsSystem.counterValue(
+              MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections
+              === 2)
+            assert(MetricsSystem.counterValue(
+              MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections
+              === 2)
+        }
+
+        eventually(timeout(1.seconds), interval(200.milliseconds)) {
+          assert(MetricsSystem.counterValue(
+            MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections
+            === 2)
+          assert(MetricsSystem.counterValue(
+            MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections === 1)
+        }
+    }
+
+    eventually(timeout(1.seconds), interval(200.milliseconds)) {
+      assert(MetricsSystem.counterValue(
+        MetricsConstants.THRIFT_BINARY_CONN_TOTAL).getOrElse(0L) - totalConnections
+        === 2)
+      assert(MetricsSystem.counterValue(
+        MetricsConstants.THRIFT_BINARY_CONN_OPEN).getOrElse(0L) - openConnections === 0)
+    }
+  }
+}