You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/05/08 14:46:51 UTC

[kyuubi] branch master updated: [KYUUBI #4733] Introduce Kafka event logger for server events

This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d73ec64b3 [KYUUBI #4733] Introduce Kafka event logger for server events
d73ec64b3 is described below

commit d73ec64b371cfa3e94d44e39a00c275acdda6abc
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Mon May 8 22:45:52 2023 +0800

    [KYUUBI #4733] Introduce Kafka event logger for server events
    
    ### _Why are the changes needed?_
    
    - introduce new event logger type `KAFKA`
    - send server events to the Kafka topic with initializing and closing Kafka producer properly with server's lifecyle
    - use Kafka 3.4.0 as the client version, and tested with Kakfa servers of 2.8.x and 3.4.x
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4733 from bowenliang123/kafka-logger.
    
    Closes #4733
    
    b5220d234 [liangbowen] introduce kafka server event logger
    
    Authored-by: liangbowen <li...@gf.com.cn>
    Signed-off-by: liangbowen <li...@gf.com.cn>
---
 LICENSE-binary                                     |   4 +
 NOTICE-binary                                      |  25 ++++-
 dev/dependencyList                                 |   4 +
 docs/deployment/settings.md                        |  26 ++---
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  26 ++++-
 kyuubi-events/pom.xml                              |   5 +
 .../kyuubi/events/EventHandlerRegister.scala       |   7 ++
 .../org/apache/kyuubi/events/EventLoggerType.scala |   3 +-
 .../events/handler/KafkaLoggingEventHandler.scala  |  70 +++++++++++++
 kyuubi-server/pom.xml                              |  17 ++++
 .../kyuubi/events/ServerEventHandlerRegister.scala |  21 +++-
 .../handler/ServerKafkaLoggingEventHandler.scala   |  15 ++-
 .../ServerKafkaLoggingEventHandlerSuite.scala      | 113 +++++++++++++++++++++
 pom.xml                                            |  14 +++
 14 files changed, 327 insertions(+), 23 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index a52ea95fb..542259658 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -321,6 +321,9 @@ io.vertx:vertx-grpc
 org.apache.zookeeper:zookeeper
 com.squareup.retrofit2:retrofit
 com.squareup.okhttp3:okhttp
+org.apache.kafka:kafka-clients
+org.lz4:lz4-java
+org.xerial.snappy:snappy-java
 
 BSD
 ------------
@@ -332,6 +335,7 @@ com.thoughtworks.paranamer:paranamer
 dk.brics.automaton:automaton
 com.google.protobuf:protobuf-java-util
 com.google.protobuf:protobuf-java
+com.github.luben:zstd-jni
 
 Eclipse Distribution License - v 1.0
 ------------------------------------
diff --git a/NOTICE-binary b/NOTICE-binary
index ef58e21f6..16281d0d8 100644
--- a/NOTICE-binary
+++ b/NOTICE-binary
@@ -1236,7 +1236,7 @@ This product optionally depends on 'zstd-jni', a zstd-jni Java compression
 and decompression library, which can be obtained at:
 
   * LICENSE:
-    * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+    * license/LICENSE.zstd-jni.txt (BSD License)
   * HOMEPAGE:
     * https://github.com/luben/zstd-jni
 
@@ -1370,3 +1370,26 @@ decompression for Java., which can be obtained at:
   * HOMEPAGE:
     * https://github.com/hyperxpro/Brotli4j
 
+This product depends on 'kafka-clients', Java clients for Kafka,
+which can be obtained at:
+
+  * LICENSE:
+    * license/LICENSE.kafka.txt (Apache License 2.0)
+  * HOMEPAGE:
+    * https://github.com/apache/kafka
+
+This product optionally depends on 'snappy-java', Snappy compression and
+decompression for Java, which can be obtained at:
+
+  * LICENSE:
+    * license/LICENSE.snappy-java.txt (Apache License 2.0)
+  * HOMEPAGE:
+    * https://github.com/xerial/snappy-java
+
+This product optionally depends on 'lz4-java', Lz4 compression and
+decompression for Java, which can be obtained at:
+
+  * LICENSE:
+    * license/LICENSE.lz4-java.txt (Apache License 2.0)
+  * HOMEPAGE:
+    * https://github.com/lz4/lz4-java
diff --git a/dev/dependencyList b/dev/dependencyList
index 0abcc5196..11f2e2c3c 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -107,6 +107,7 @@ jetty-util-ajax/9.4.50.v20221201//jetty-util-ajax-9.4.50.v20221201.jar
 jetty-util/9.4.50.v20221201//jetty-util-9.4.50.v20221201.jar
 jline/0.9.94//jline-0.9.94.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
+kafka-clients/3.4.0//kafka-clients-3.4.0.jar
 kubernetes-client-api/6.4.1//kubernetes-client-api-6.4.1.jar
 kubernetes-client/6.4.1//kubernetes-client-6.4.1.jar
 kubernetes-httpclient-okhttp/6.4.1//kubernetes-httpclient-okhttp-6.4.1.jar
@@ -138,6 +139,7 @@ log4j-api/2.20.0//log4j-api-2.20.0.jar
 log4j-core/2.20.0//log4j-core-2.20.0.jar
 log4j-slf4j-impl/2.20.0//log4j-slf4j-impl-2.20.0.jar
 logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar
+lz4-java/1.8.0//lz4-java-1.8.0.jar
 metrics-core/4.2.8//metrics-core-4.2.8.jar
 metrics-jmx/4.2.8//metrics-jmx-4.2.8.jar
 metrics-json/4.2.8//metrics-json-4.2.8.jar
@@ -181,6 +183,7 @@ simpleclient_tracer_otel/0.16.0//simpleclient_tracer_otel-0.16.0.jar
 simpleclient_tracer_otel_agent/0.16.0//simpleclient_tracer_otel_agent-0.16.0.jar
 slf4j-api/1.7.36//slf4j-api-1.7.36.jar
 snakeyaml/1.33//snakeyaml-1.33.jar
+snappy-java/1.1.8.4//snappy-java-1.1.8.4.jar
 swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar
 swagger-core/2.2.1//swagger-core-2.2.1.jar
 swagger-integration/2.2.1//swagger-integration-2.2.1.jar
@@ -193,3 +196,4 @@ vertx-core/4.3.2//vertx-core-4.3.2.jar
 vertx-grpc/4.3.2//vertx-grpc-4.3.2.jar
 zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
 zookeeper/3.4.14//zookeeper-3.4.14.jar
+zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 0c7cdfc89..a358b4270 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -58,18 +58,20 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
 
 ### Backend
 
-|                       Key                        |          Default          |                                                                                                                                                                                                                                                                    Meaning                                                                                                                                                  [...]
-|--------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| kyuubi.backend.engine.exec.pool.keepalive.time   | PT1M                      | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications                                                                                                                                                                                                                                                                  [...]
-| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S                     | Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications                                                                                                                                                                                                                                                                                                                                 [...]
-| kyuubi.backend.engine.exec.pool.size             | 100                       | Number of threads in the operation execution thread pool of SQL engine applications                                                                                                                                                                                                                                                                                                                                         [...]
-| kyuubi.backend.engine.exec.pool.wait.queue.size  | 100                       | Size of the wait queue for the operation execution thread pool in SQL engine applications                                                                                                                                                                                                                                                                                                                                   [...]
-| kyuubi.backend.server.event.json.log.path        | file:///tmp/kyuubi/events | The location of server events go for the built-in JSON logger                                                                                                                                                                                                                                                                                                                                                               [...]
-| kyuubi.backend.server.event.loggers                                         || A comma-separated list of server history loggers, where session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.backend.server.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a clas [...]
-| kyuubi.backend.server.exec.pool.keepalive.time   | PT1M                      | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server                                                                                                                                                                                                                                                                            [...]
-| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S                     | Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server                                                                                                                                                                                                                                                                                                                                           [...]
-| kyuubi.backend.server.exec.pool.size             | 100                       | Number of threads in the operation execution thread pool of Kyuubi server                                                                                                                                                                                                                                                                                                                                                   [...]
-| kyuubi.backend.server.exec.pool.wait.queue.size  | 100                       | Size of the wait queue for the operation execution thread pool of Kyuubi server                                                                                                                                                                                                                                                                                                                                             [...]
+|                       Key                        |          Default          |                                                                                                                                                                                                                                                                                                                                                                                                                             [...]
+|--------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| kyuubi.backend.engine.exec.pool.keepalive.time   | PT1M                      | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications                                                                                                                                                                                                                                                                  [...]
+| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S                     | Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications                                                                                                                                                                                                                                                                                                                                 [...]
+| kyuubi.backend.engine.exec.pool.size             | 100                       | Number of threads in the operation execution thread pool of SQL engine applications                                                                                                                                                                                                                                                                                                                                         [...]
+| kyuubi.backend.engine.exec.pool.wait.queue.size  | 100                       | Size of the wait queue for the operation execution thread pool in SQL engine applications                                                                                                                                                                                                                                                                                                                                   [...]
+| kyuubi.backend.server.event.json.log.path        | file:///tmp/kyuubi/events | The location of server events go for the built-in JSON logger                                                                                                                                                                                                                                                                                                                                                               [...]
+| kyuubi.backend.server.event.kafka.close.timeout  | PT5S                      | Period to wait for Kafka producer of server event handlers to close.                                                                                                                                                                                                                                                                                                                                                        [...]
+| kyuubi.backend.server.event.kafka.topic          | &lt;undefined&gt;         | The topic of server events go for the built-in Kafka logger                                                                                                                                                                                                                                                                                                                                                                 [...]
+| kyuubi.backend.server.event.loggers                                         || A comma-separated list of server history loggers, where session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.backend.server.event.json.log.path</li> <li>KAFKA: the events will be serialized in JSON format and sent to topic of `kyuubi.backend.server.event.kafka.topic`. Note: For the configs of Kafka producer, please specify them with the prefix: `kyuubi.backend.s [...]
+| kyuubi.backend.server.exec.pool.keepalive.time   | PT1M                      | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server                                                                                                                                                                                                                                                                            [...]
+| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S                     | Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server                                                                                                                                                                                                                                                                                                                                           [...]
+| kyuubi.backend.server.exec.pool.size             | 100                       | Number of threads in the operation execution thread pool of Kyuubi server                                                                                                                                                                                                                                                                                                                                                   [...]
+| kyuubi.backend.server.exec.pool.wait.queue.size  | 100                       | Size of the wait queue for the operation execution thread pool of Kyuubi server                                                                                                                                                                                                                                                                                                                                             [...]
 
 ### Batch
 
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a6a594063..8da336102 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2008,12 +2008,34 @@ object KyuubiConf {
       .stringConf
       .createWithDefault("file:///tmp/kyuubi/events")
 
+  val SERVER_EVENT_KAFKA_TOPIC: OptionalConfigEntry[String] =
+    buildConf("kyuubi.backend.server.event.kafka.topic")
+      .doc("The topic of server events go for the built-in Kafka logger")
+      .version("1.8.0")
+      .serverOnly
+      .stringConf
+      .createOptional
+
+  val SERVER_EVENT_KAFKA_CLOSE_TIMEOUT: ConfigEntry[Long] =
+    buildConf("kyuubi.backend.server.event.kafka.close.timeout")
+      .doc("Period to wait for Kafka producer of server event handlers to close.")
+      .version("1.8.0")
+      .serverOnly
+      .timeConf
+      .createWithDefault(Duration.ofMillis(5000).toMillis)
+
   val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
     buildConf("kyuubi.backend.server.event.loggers")
       .doc("A comma-separated list of server history loggers, where session/operation etc" +
         " events go.<ul>" +
         s" <li>JSON: the events will be written to the location of" +
         s" ${SERVER_EVENT_JSON_LOG_PATH.key}</li>" +
+        s" <li>KAFKA: the events will be serialized in JSON format" +
+        s" and sent to topic of `${SERVER_EVENT_KAFKA_TOPIC.key}`." +
+        s" Note: For the configs of Kafka producer," +
+        s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
+        s" For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`" +
+        s" </li>" +
         s" <li>JDBC: to be done</li>" +
         s" <li>CUSTOM: User-defined event handlers.</li></ul>" +
         " Note that: Kyuubi supports custom event handlers with the Java SPI." +
@@ -2026,7 +2048,9 @@ object KyuubiConf {
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .toSequence()
-      .checkValue(_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM")), "Unsupported event loggers")
+      .checkValue(
+        _.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA")),
+        "Unsupported event loggers")
       .createWithDefault(Nil)
 
   @deprecated("using kyuubi.engine.spark.event.loggers instead", "1.6.0")
diff --git a/kyuubi-events/pom.xml b/kyuubi-events/pom.xml
index b97e9dffb..6b51fe015 100644
--- a/kyuubi-events/pom.xml
+++ b/kyuubi-events/pom.xml
@@ -37,6 +37,11 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
index 6c7e0893f..f75e4be4f 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
@@ -51,6 +51,10 @@ trait EventHandlerRegister extends Logging {
     throw new KyuubiException(s"Unsupported jdbc event logger.")
   }
 
+  protected def createKafkaEventHandler(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
+    throw new KyuubiException(s"Unsupported kafka event logger.")
+  }
+
   private def loadEventHandler(
       eventLoggerType: EventLoggerType,
       kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
@@ -64,6 +68,9 @@ trait EventHandlerRegister extends Logging {
       case EventLoggerType.JDBC =>
         createJdbcEventHandler(kyuubiConf) :: Nil
 
+      case EventLoggerType.KAFKA =>
+        createKafkaEventHandler(kyuubiConf) :: Nil
+
       case EventLoggerType.CUSTOM =>
         EventHandlerLoader.loadCustom(kyuubiConf)
 
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
index a029a0fc5..987982371 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
@@ -21,6 +21,5 @@ object EventLoggerType extends Enumeration {
 
   type EventLoggerType = Value
 
-  // TODO: Only SPARK is done now
-  val SPARK, JSON, JDBC, CUSTOM = Value
+  val SPARK, JSON, JDBC, CUSTOM, KAFKA = Value
 }
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/KafkaLoggingEventHandler.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/KafkaLoggingEventHandler.scala
new file mode 100644
index 000000000..a245daef0
--- /dev/null
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/KafkaLoggingEventHandler.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events.handler
+
+import java.time.Duration
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.events.handler.KafkaLoggingEventHandler._
+
+/**
+ * This event logger logs events to Kafka.
+ */
+class KafkaLoggingEventHandler(
+    topic: String,
+    producerConf: Map[String, String],
+    kyuubiConf: KyuubiConf,
+    closeTimeoutInMs: Long) extends EventHandler[KyuubiEvent] with Logging {
+  private def defaultProducerConf: Properties = {
+    val conf = new Properties()
+    conf.setProperty("key.serializer", DEFAULT_SERIALIZER_CLASS)
+    conf.setProperty("value.serializer", DEFAULT_SERIALIZER_CLASS)
+    conf
+  }
+
+  private val normalizedProducerConf: Properties = {
+    val conf = defaultProducerConf
+    producerConf.foreach(p => conf.setProperty(p._1, p._2))
+    conf
+  }
+
+  private val kafkaProducer = new KafkaProducer[String, String](normalizedProducerConf)
+
+  override def apply(event: KyuubiEvent): Unit = {
+    try {
+      val record = new ProducerRecord[String, String](topic, event.eventType, event.toJson)
+      kafkaProducer.send(record)
+    } catch {
+      case e: Exception =>
+        error("Failed to send event in KafkaEventHandler", e)
+    }
+  }
+
+  override def close(): Unit = {
+    kafkaProducer.close(Duration.ofMillis(closeTimeoutInMs))
+  }
+}
+
+object KafkaLoggingEventHandler {
+  private val DEFAULT_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer"
+}
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index c4585b131..54d4507d5 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -395,6 +395,23 @@
             <artifactId>swagger-ui</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.dimafeng</groupId>
+            <artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.dimafeng</groupId>
+            <artifactId>testcontainers-scala-kafka_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-exec</artifactId>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
index 4ddee48dd..ca6c776ac 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
@@ -19,8 +19,9 @@ package org.apache.kyuubi.events
 import java.net.InetAddress
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{SERVER_EVENT_JSON_LOG_PATH, SERVER_EVENT_LOGGERS}
-import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler}
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler, ServerKafkaLoggingEventHandler}
+import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 object ServerEventHandlerRegister extends EventHandlerRegister {
@@ -36,6 +37,22 @@ object ServerEventHandlerRegister extends EventHandlerRegister {
       kyuubiConf)
   }
 
+  override def createKafkaEventHandler(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
+    val topic = kyuubiConf.get(SERVER_EVENT_KAFKA_TOPIC).getOrElse {
+      throw new IllegalArgumentException(s"${SERVER_EVENT_KAFKA_TOPIC.key} must be configured")
+    }
+    val closeTimeoutInMs = kyuubiConf.get(SERVER_EVENT_KAFKA_CLOSE_TIMEOUT)
+    val kafkaEventHandlerProducerConf =
+      kyuubiConf.getAllWithPrefix(KAFKA_SERVER_EVENT_HANDLER_PREFIX, "")
+        .filterKeys(
+          !List(SERVER_EVENT_KAFKA_TOPIC, SERVER_EVENT_KAFKA_CLOSE_TIMEOUT).map(_.key).contains(_))
+    ServerKafkaLoggingEventHandler(
+      topic,
+      kafkaEventHandlerProducerConf,
+      kyuubiConf,
+      closeTimeoutInMs)
+  }
+
   override protected def getLoggers(conf: KyuubiConf): Seq[String] = {
     conf.get(SERVER_EVENT_LOGGERS)
   }
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandler.scala
similarity index 64%
copy from kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
copy to kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandler.scala
index a029a0fc5..a7421a057 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandler.scala
@@ -15,12 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.events
+package org.apache.kyuubi.events.handler
 
-object EventLoggerType extends Enumeration {
+import org.apache.kyuubi.config.KyuubiConf
 
-  type EventLoggerType = Value
+case class ServerKafkaLoggingEventHandler(
+    topic: String,
+    producerConf: Map[String, String],
+    kyuubiConf: KyuubiConf,
+    closeTimeoutInMs: Long)
+  extends KafkaLoggingEventHandler(topic, producerConf, kyuubiConf, closeTimeoutInMs)
 
-  // TODO: Only SPARK is done now
-  val SPARK, JSON, JDBC, CUSTOM = Value
+object ServerKafkaLoggingEventHandler {
+  val KAFKA_SERVER_EVENT_HANDLER_PREFIX = "kyuubi.backend.server.event.kafka"
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandlerSuite.scala
new file mode 100644
index 000000000..461414f3f
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandlerSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events.handler
+
+import java.time.Duration
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.util.Random
+
+import com.dimafeng.testcontainers.KafkaContainer
+import com.dimafeng.testcontainers.scalatest.TestContainerForAll
+import com.fasterxml.jackson.databind.json.JsonMapper
+import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
+import org.apache.kafka.clients.consumer.KafkaConsumer
+
+import org.apache.kyuubi._
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+abstract class ServerKafkaLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCTestHelper
+  with BatchTestHelper with TestContainerForAll {
+
+  /**
+   * `confluentinc/cp-kafka` is Confluent Community Docker Image for Apache Kafka.
+   * The list of compatibility for Kafka's version refers to:
+   * https://docs.confluent.io/platform/current/installation
+   * /versions-interoperability.html#cp-and-apache-ak-compatibility
+   */
+  protected val imageTag: String
+  override lazy val containerDef: KafkaContainer.Def =
+    KafkaContainer.Def(s"confluentinc/cp-kafka:$imageTag")
+  private val destTopic = "server-event-topic"
+  private val mapper = JsonMapper.builder().build()
+  override protected def jdbcUrl: String = getJdbcUrl
+
+  override protected val conf: KyuubiConf = {
+    KyuubiConf()
+      .set(KyuubiConf.SERVER_EVENT_LOGGERS, Seq("KAFKA"))
+      .set(KyuubiConf.SERVER_EVENT_KAFKA_TOPIC, destTopic)
+  }
+
+  override def beforeAll(): Unit = withContainers { kafkaContainer =>
+    val bootstrapServers = kafkaContainer.bootstrapServers
+    createTopic(kafkaContainer.bootstrapServers, destTopic)
+    conf.set(s"$KAFKA_SERVER_EVENT_HANDLER_PREFIX.bootstrap.servers", bootstrapServers)
+
+    super.beforeAll()
+  }
+
+  private def createTopic(kafkaServerUrl: String, topic: String): Unit = {
+    val adminProps = new Properties
+    adminProps.setProperty("bootstrap.servers", kafkaServerUrl)
+    val adminClient = AdminClient.create(adminProps)
+    adminClient.createTopics(List(new NewTopic(topic, 1, 1.toShort)).asJava)
+    adminClient.close()
+  }
+
+  test("check server events sent to kafka topic") {
+    withContainers { kafkaContainer =>
+      val consumerConf = new Properties
+      Map(
+        "bootstrap.servers" -> kafkaContainer.bootstrapServers,
+        "group.id" -> s"server-kafka-logger-test-${Random.nextInt}",
+        "auto.offset.reset" -> "earliest",
+        "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
+        "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")
+        .foreach(p => consumerConf.setProperty(p._1, p._2))
+      val consumer = new KafkaConsumer[String, String](consumerConf)
+      try {
+        consumer.subscribe(List(destTopic).asJava)
+        eventually(timeout(10.seconds), interval(500.milliseconds)) {
+          val records = consumer.poll(Duration.ofMillis(500))
+          assert(records.count() > 0)
+          records.forEach { record =>
+            val jsonObj = mapper.readTree(record.value())
+            assertResult("kyuubi_server_info")(record.key)
+            assertResult(server.getName)(jsonObj.get("serverName").asText())
+          }
+        }
+      } finally {
+        consumer.close()
+      }
+    }
+  }
+}
+
+class ServerKafkaLoggingEventHandlerSuiteForKafka2 extends ServerKafkaLoggingEventHandlerSuite {
+  // equivalent to Apache Kafka 2.8.x
+  override val imageTag = "6.2.10"
+}
+
+class ServerKafkaLoggingEventHandlerSuiteForKafka3 extends ServerKafkaLoggingEventHandlerSuite {
+  // equivalent to Apache Kafka 3.3.x
+  override val imageTag = "7.3.3"
+}
diff --git a/pom.xml b/pom.xml
index af56eb167..2fe54e329 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
         <jetty.version>9.4.50.v20221201</jetty.version>
         <jline.version>0.9.94</jline.version>
         <junit.version>4.13.2</junit.version>
+        <kafka.version>3.4.0</kafka.version>
         <kubernetes-client.version>6.4.1</kubernetes-client.version>
         <kudu.version>1.15.0</kudu.version>
         <ldapsdk.version>6.0.5</ldapsdk.version>
@@ -545,6 +546,12 @@
                 <version>${testcontainers-scala.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.dimafeng</groupId>
+                <artifactId>testcontainers-scala-kafka_${scala.binary.version}</artifactId>
+                <version>${testcontainers-scala.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>io.fabric8</groupId>
                 <artifactId>kubernetes-client</artifactId>
@@ -1261,6 +1268,13 @@
                 </exclusions>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>${kafka.version}</version>
+                <optional>true</optional>
+            </dependency>
+
             <dependency>
                 <groupId>com.github.scopt</groupId>
                 <artifactId>scopt_${scala.binary.version}</artifactId>