You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/05/08 14:46:51 UTC
[kyuubi] branch master updated: [KYUUBI #4733] Introduce Kafka event logger for server events
This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d73ec64b3 [KYUUBI #4733] Introduce Kafka event logger for server events
d73ec64b3 is described below
commit d73ec64b371cfa3e94d44e39a00c275acdda6abc
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Mon May 8 22:45:52 2023 +0800
[KYUUBI #4733] Introduce Kafka event logger for server events
### _Why are the changes needed?_
- introduce new event logger type `KAFKA`
- send server events to the Kafka topic with initializing and closing Kafka producer properly with server's lifecyle
- use Kafka 3.4.0 as the client version, and tested with Kakfa servers of 2.8.x and 3.4.x
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4733 from bowenliang123/kafka-logger.
Closes #4733
b5220d234 [liangbowen] introduce kafka server event logger
Authored-by: liangbowen <li...@gf.com.cn>
Signed-off-by: liangbowen <li...@gf.com.cn>
---
LICENSE-binary | 4 +
NOTICE-binary | 25 ++++-
dev/dependencyList | 4 +
docs/deployment/settings.md | 26 ++---
.../org/apache/kyuubi/config/KyuubiConf.scala | 26 ++++-
kyuubi-events/pom.xml | 5 +
.../kyuubi/events/EventHandlerRegister.scala | 7 ++
.../org/apache/kyuubi/events/EventLoggerType.scala | 3 +-
.../events/handler/KafkaLoggingEventHandler.scala | 70 +++++++++++++
kyuubi-server/pom.xml | 17 ++++
.../kyuubi/events/ServerEventHandlerRegister.scala | 21 +++-
.../handler/ServerKafkaLoggingEventHandler.scala | 15 ++-
.../ServerKafkaLoggingEventHandlerSuite.scala | 113 +++++++++++++++++++++
pom.xml | 14 +++
14 files changed, 327 insertions(+), 23 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index a52ea95fb..542259658 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -321,6 +321,9 @@ io.vertx:vertx-grpc
org.apache.zookeeper:zookeeper
com.squareup.retrofit2:retrofit
com.squareup.okhttp3:okhttp
+org.apache.kafka:kafka-clients
+org.lz4:lz4-java
+org.xerial.snappy:snappy-java
BSD
------------
@@ -332,6 +335,7 @@ com.thoughtworks.paranamer:paranamer
dk.brics.automaton:automaton
com.google.protobuf:protobuf-java-util
com.google.protobuf:protobuf-java
+com.github.luben:zstd-jni
Eclipse Distribution License - v 1.0
------------------------------------
diff --git a/NOTICE-binary b/NOTICE-binary
index ef58e21f6..16281d0d8 100644
--- a/NOTICE-binary
+++ b/NOTICE-binary
@@ -1236,7 +1236,7 @@ This product optionally depends on 'zstd-jni', a zstd-jni Java compression
and decompression library, which can be obtained at:
* LICENSE:
- * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+ * license/LICENSE.zstd-jni.txt (BSD License)
* HOMEPAGE:
* https://github.com/luben/zstd-jni
@@ -1370,3 +1370,26 @@ decompression for Java., which can be obtained at:
* HOMEPAGE:
* https://github.com/hyperxpro/Brotli4j
+This product depends on 'kafka-clients', Java clients for Kafka,
+which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.kafka.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/apache/kafka
+
+This product optionally depends on 'snappy-java', Snappy compression and
+decompression for Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.snappy-java.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/xerial/snappy-java
+
+This product optionally depends on 'lz4-java', Lz4 compression and
+decompression for Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lz4-java.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/lz4/lz4-java
diff --git a/dev/dependencyList b/dev/dependencyList
index 0abcc5196..11f2e2c3c 100644
--- a/dev/dependencyList
+++ b/dev/dependencyList
@@ -107,6 +107,7 @@ jetty-util-ajax/9.4.50.v20221201//jetty-util-ajax-9.4.50.v20221201.jar
jetty-util/9.4.50.v20221201//jetty-util-9.4.50.v20221201.jar
jline/0.9.94//jline-0.9.94.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
+kafka-clients/3.4.0//kafka-clients-3.4.0.jar
kubernetes-client-api/6.4.1//kubernetes-client-api-6.4.1.jar
kubernetes-client/6.4.1//kubernetes-client-6.4.1.jar
kubernetes-httpclient-okhttp/6.4.1//kubernetes-httpclient-okhttp-6.4.1.jar
@@ -138,6 +139,7 @@ log4j-api/2.20.0//log4j-api-2.20.0.jar
log4j-core/2.20.0//log4j-core-2.20.0.jar
log4j-slf4j-impl/2.20.0//log4j-slf4j-impl-2.20.0.jar
logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar
+lz4-java/1.8.0//lz4-java-1.8.0.jar
metrics-core/4.2.8//metrics-core-4.2.8.jar
metrics-jmx/4.2.8//metrics-jmx-4.2.8.jar
metrics-json/4.2.8//metrics-json-4.2.8.jar
@@ -181,6 +183,7 @@ simpleclient_tracer_otel/0.16.0//simpleclient_tracer_otel-0.16.0.jar
simpleclient_tracer_otel_agent/0.16.0//simpleclient_tracer_otel_agent-0.16.0.jar
slf4j-api/1.7.36//slf4j-api-1.7.36.jar
snakeyaml/1.33//snakeyaml-1.33.jar
+snappy-java/1.1.8.4//snappy-java-1.1.8.4.jar
swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar
swagger-core/2.2.1//swagger-core-2.2.1.jar
swagger-integration/2.2.1//swagger-integration-2.2.1.jar
@@ -193,3 +196,4 @@ vertx-core/4.3.2//vertx-core-4.3.2.jar
vertx-grpc/4.3.2//vertx-grpc-4.3.2.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zookeeper/3.4.14//zookeeper-3.4.14.jar
+zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 0c7cdfc89..a358b4270 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -58,18 +58,20 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
### Backend
-| Key | Default | Meaning [...]
-|--------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| kyuubi.backend.engine.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications [...]
-| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications [...]
-| kyuubi.backend.engine.exec.pool.size | 100 | Number of threads in the operation execution thread pool of SQL engine applications [...]
-| kyuubi.backend.engine.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool in SQL engine applications [...]
-| kyuubi.backend.server.event.json.log.path | file:///tmp/kyuubi/events | The location of server events go for the built-in JSON logger [...]
-| kyuubi.backend.server.event.loggers || A comma-separated list of server history loggers, where session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.backend.server.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a clas [...]
-| kyuubi.backend.server.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server [...]
-| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server [...]
-| kyuubi.backend.server.exec.pool.size | 100 | Number of threads in the operation execution thread pool of Kyuubi server [...]
-| kyuubi.backend.server.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool of Kyuubi server [...]
+| Key | Default | [...]
+|--------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| kyuubi.backend.engine.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in SQL engine applications [...]
+| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in SQL engine applications [...]
+| kyuubi.backend.engine.exec.pool.size | 100 | Number of threads in the operation execution thread pool of SQL engine applications [...]
+| kyuubi.backend.engine.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool in SQL engine applications [...]
+| kyuubi.backend.server.event.json.log.path | file:///tmp/kyuubi/events | The location of server events go for the built-in JSON logger [...]
+| kyuubi.backend.server.event.kafka.close.timeout | PT5S | Period to wait for Kafka producer of server event handlers to close. [...]
+| kyuubi.backend.server.event.kafka.topic | <undefined> | The topic of server events go for the built-in Kafka logger [...]
+| kyuubi.backend.server.event.loggers || A comma-separated list of server history loggers, where session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.backend.server.event.json.log.path</li> <li>KAFKA: the events will be serialized in JSON format and sent to topic of `kyuubi.backend.server.event.kafka.topic`. Note: For the configs of Kafka producer, please specify them with the prefix: `kyuubi.backend.s [...]
+| kyuubi.backend.server.exec.pool.keepalive.time | PT1M | Time(ms) that an idle async thread of the operation execution thread pool will wait for a new task to arrive before terminating in Kyuubi server [...]
+| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S | Timeout(ms) for the operation execution thread pool to terminate in Kyuubi server [...]
+| kyuubi.backend.server.exec.pool.size | 100 | Number of threads in the operation execution thread pool of Kyuubi server [...]
+| kyuubi.backend.server.exec.pool.wait.queue.size | 100 | Size of the wait queue for the operation execution thread pool of Kyuubi server [...]
### Batch
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a6a594063..8da336102 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2008,12 +2008,34 @@ object KyuubiConf {
.stringConf
.createWithDefault("file:///tmp/kyuubi/events")
+ val SERVER_EVENT_KAFKA_TOPIC: OptionalConfigEntry[String] =
+ buildConf("kyuubi.backend.server.event.kafka.topic")
+ .doc("The topic of server events go for the built-in Kafka logger")
+ .version("1.8.0")
+ .serverOnly
+ .stringConf
+ .createOptional
+
+ val SERVER_EVENT_KAFKA_CLOSE_TIMEOUT: ConfigEntry[Long] =
+ buildConf("kyuubi.backend.server.event.kafka.close.timeout")
+ .doc("Period to wait for Kafka producer of server event handlers to close.")
+ .version("1.8.0")
+ .serverOnly
+ .timeConf
+ .createWithDefault(Duration.ofMillis(5000).toMillis)
+
val SERVER_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.backend.server.event.loggers")
.doc("A comma-separated list of server history loggers, where session/operation etc" +
" events go.<ul>" +
s" <li>JSON: the events will be written to the location of" +
s" ${SERVER_EVENT_JSON_LOG_PATH.key}</li>" +
+ s" <li>KAFKA: the events will be serialized in JSON format" +
+ s" and sent to topic of `${SERVER_EVENT_KAFKA_TOPIC.key}`." +
+ s" Note: For the configs of Kafka producer," +
+ s" please specify them with the prefix: `kyuubi.backend.server.event.kafka.`." +
+ s" For example, `kyuubi.backend.server.event.kafka.bootstrap.servers=127.0.0.1:9092`" +
+ s" </li>" +
s" <li>JDBC: to be done</li>" +
s" <li>CUSTOM: User-defined event handlers.</li></ul>" +
" Note that: Kyuubi supports custom event handlers with the Java SPI." +
@@ -2026,7 +2048,9 @@ object KyuubiConf {
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.toSequence()
- .checkValue(_.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM")), "Unsupported event loggers")
+ .checkValue(
+ _.toSet.subsetOf(Set("JSON", "JDBC", "CUSTOM", "KAFKA")),
+ "Unsupported event loggers")
.createWithDefault(Nil)
@deprecated("using kyuubi.engine.spark.event.loggers instead", "1.6.0")
diff --git a/kyuubi-events/pom.xml b/kyuubi-events/pom.xml
index b97e9dffb..6b51fe015 100644
--- a/kyuubi-events/pom.xml
+++ b/kyuubi-events/pom.xml
@@ -37,6 +37,11 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
index 6c7e0893f..f75e4be4f 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
@@ -51,6 +51,10 @@ trait EventHandlerRegister extends Logging {
throw new KyuubiException(s"Unsupported jdbc event logger.")
}
+ protected def createKafkaEventHandler(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
+ throw new KyuubiException(s"Unsupported kafka event logger.")
+ }
+
private def loadEventHandler(
eventLoggerType: EventLoggerType,
kyuubiConf: KyuubiConf): Seq[EventHandler[KyuubiEvent]] = {
@@ -64,6 +68,9 @@ trait EventHandlerRegister extends Logging {
case EventLoggerType.JDBC =>
createJdbcEventHandler(kyuubiConf) :: Nil
+ case EventLoggerType.KAFKA =>
+ createKafkaEventHandler(kyuubiConf) :: Nil
+
case EventLoggerType.CUSTOM =>
EventHandlerLoader.loadCustom(kyuubiConf)
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
index a029a0fc5..987982371 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
@@ -21,6 +21,5 @@ object EventLoggerType extends Enumeration {
type EventLoggerType = Value
- // TODO: Only SPARK is done now
- val SPARK, JSON, JDBC, CUSTOM = Value
+ val SPARK, JSON, JDBC, CUSTOM, KAFKA = Value
}
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/KafkaLoggingEventHandler.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/KafkaLoggingEventHandler.scala
new file mode 100644
index 000000000..a245daef0
--- /dev/null
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/KafkaLoggingEventHandler.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events.handler
+
+import java.time.Duration
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.events.handler.KafkaLoggingEventHandler._
+
+/**
+ * This event logger logs events to Kafka.
+ */
+class KafkaLoggingEventHandler(
+ topic: String,
+ producerConf: Map[String, String],
+ kyuubiConf: KyuubiConf,
+ closeTimeoutInMs: Long) extends EventHandler[KyuubiEvent] with Logging {
+ private def defaultProducerConf: Properties = {
+ val conf = new Properties()
+ conf.setProperty("key.serializer", DEFAULT_SERIALIZER_CLASS)
+ conf.setProperty("value.serializer", DEFAULT_SERIALIZER_CLASS)
+ conf
+ }
+
+ private val normalizedProducerConf: Properties = {
+ val conf = defaultProducerConf
+ producerConf.foreach(p => conf.setProperty(p._1, p._2))
+ conf
+ }
+
+ private val kafkaProducer = new KafkaProducer[String, String](normalizedProducerConf)
+
+ override def apply(event: KyuubiEvent): Unit = {
+ try {
+ val record = new ProducerRecord[String, String](topic, event.eventType, event.toJson)
+ kafkaProducer.send(record)
+ } catch {
+ case e: Exception =>
+ error("Failed to send event in KafkaEventHandler", e)
+ }
+ }
+
+ override def close(): Unit = {
+ kafkaProducer.close(Duration.ofMillis(closeTimeoutInMs))
+ }
+}
+
+object KafkaLoggingEventHandler {
+ private val DEFAULT_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer"
+}
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index c4585b131..54d4507d5 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -395,6 +395,23 @@
<artifactId>swagger-ui</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.dimafeng</groupId>
+ <artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.dimafeng</groupId>
+ <artifactId>testcontainers-scala-kafka_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
index 4ddee48dd..ca6c776ac 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
@@ -19,8 +19,9 @@ package org.apache.kyuubi.events
import java.net.InetAddress
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{SERVER_EVENT_JSON_LOG_PATH, SERVER_EVENT_LOGGERS}
-import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler}
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.events.handler.{EventHandler, ServerJsonLoggingEventHandler, ServerKafkaLoggingEventHandler}
+import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
import org.apache.kyuubi.util.KyuubiHadoopUtils
object ServerEventHandlerRegister extends EventHandlerRegister {
@@ -36,6 +37,22 @@ object ServerEventHandlerRegister extends EventHandlerRegister {
kyuubiConf)
}
+ override def createKafkaEventHandler(kyuubiConf: KyuubiConf): EventHandler[KyuubiEvent] = {
+ val topic = kyuubiConf.get(SERVER_EVENT_KAFKA_TOPIC).getOrElse {
+ throw new IllegalArgumentException(s"${SERVER_EVENT_KAFKA_TOPIC.key} must be configured")
+ }
+ val closeTimeoutInMs = kyuubiConf.get(SERVER_EVENT_KAFKA_CLOSE_TIMEOUT)
+ val kafkaEventHandlerProducerConf =
+ kyuubiConf.getAllWithPrefix(KAFKA_SERVER_EVENT_HANDLER_PREFIX, "")
+ .filterKeys(
+ !List(SERVER_EVENT_KAFKA_TOPIC, SERVER_EVENT_KAFKA_CLOSE_TIMEOUT).map(_.key).contains(_))
+ ServerKafkaLoggingEventHandler(
+ topic,
+ kafkaEventHandlerProducerConf,
+ kyuubiConf,
+ closeTimeoutInMs)
+ }
+
override protected def getLoggers(conf: KyuubiConf): Seq[String] = {
conf.get(SERVER_EVENT_LOGGERS)
}
diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandler.scala
similarity index 64%
copy from kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
copy to kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandler.scala
index a029a0fc5..a7421a057 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventLoggerType.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandler.scala
@@ -15,12 +15,17 @@
* limitations under the License.
*/
-package org.apache.kyuubi.events
+package org.apache.kyuubi.events.handler
-object EventLoggerType extends Enumeration {
+import org.apache.kyuubi.config.KyuubiConf
- type EventLoggerType = Value
+case class ServerKafkaLoggingEventHandler(
+ topic: String,
+ producerConf: Map[String, String],
+ kyuubiConf: KyuubiConf,
+ closeTimeoutInMs: Long)
+ extends KafkaLoggingEventHandler(topic, producerConf, kyuubiConf, closeTimeoutInMs)
- // TODO: Only SPARK is done now
- val SPARK, JSON, JDBC, CUSTOM = Value
+object ServerKafkaLoggingEventHandler {
+ val KAFKA_SERVER_EVENT_HANDLER_PREFIX = "kyuubi.backend.server.event.kafka"
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandlerSuite.scala
new file mode 100644
index 000000000..461414f3f
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerKafkaLoggingEventHandlerSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events.handler
+
+import java.time.Duration
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.util.Random
+
+import com.dimafeng.testcontainers.KafkaContainer
+import com.dimafeng.testcontainers.scalatest.TestContainerForAll
+import com.fasterxml.jackson.databind.json.JsonMapper
+import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
+import org.apache.kafka.clients.consumer.KafkaConsumer
+
+import org.apache.kyuubi._
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.handler.ServerKafkaLoggingEventHandler.KAFKA_SERVER_EVENT_HANDLER_PREFIX
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+abstract class ServerKafkaLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCTestHelper
+ with BatchTestHelper with TestContainerForAll {
+
+ /**
+ * `confluentinc/cp-kafka` is Confluent Community Docker Image for Apache Kafka.
+ * The list of compatibility for Kafka's version refers to:
+ * https://docs.confluent.io/platform/current/installation
+ * /versions-interoperability.html#cp-and-apache-ak-compatibility
+ */
+ protected val imageTag: String
+ override lazy val containerDef: KafkaContainer.Def =
+ KafkaContainer.Def(s"confluentinc/cp-kafka:$imageTag")
+ private val destTopic = "server-event-topic"
+ private val mapper = JsonMapper.builder().build()
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ override protected val conf: KyuubiConf = {
+ KyuubiConf()
+ .set(KyuubiConf.SERVER_EVENT_LOGGERS, Seq("KAFKA"))
+ .set(KyuubiConf.SERVER_EVENT_KAFKA_TOPIC, destTopic)
+ }
+
+ override def beforeAll(): Unit = withContainers { kafkaContainer =>
+ val bootstrapServers = kafkaContainer.bootstrapServers
+ createTopic(kafkaContainer.bootstrapServers, destTopic)
+ conf.set(s"$KAFKA_SERVER_EVENT_HANDLER_PREFIX.bootstrap.servers", bootstrapServers)
+
+ super.beforeAll()
+ }
+
+ private def createTopic(kafkaServerUrl: String, topic: String): Unit = {
+ val adminProps = new Properties
+ adminProps.setProperty("bootstrap.servers", kafkaServerUrl)
+ val adminClient = AdminClient.create(adminProps)
+ adminClient.createTopics(List(new NewTopic(topic, 1, 1.toShort)).asJava)
+ adminClient.close()
+ }
+
+ test("check server events sent to kafka topic") {
+ withContainers { kafkaContainer =>
+ val consumerConf = new Properties
+ Map(
+ "bootstrap.servers" -> kafkaContainer.bootstrapServers,
+ "group.id" -> s"server-kafka-logger-test-${Random.nextInt}",
+ "auto.offset.reset" -> "earliest",
+ "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
+ "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")
+ .foreach(p => consumerConf.setProperty(p._1, p._2))
+ val consumer = new KafkaConsumer[String, String](consumerConf)
+ try {
+ consumer.subscribe(List(destTopic).asJava)
+ eventually(timeout(10.seconds), interval(500.milliseconds)) {
+ val records = consumer.poll(Duration.ofMillis(500))
+ assert(records.count() > 0)
+ records.forEach { record =>
+ val jsonObj = mapper.readTree(record.value())
+ assertResult("kyuubi_server_info")(record.key)
+ assertResult(server.getName)(jsonObj.get("serverName").asText())
+ }
+ }
+ } finally {
+ consumer.close()
+ }
+ }
+ }
+}
+
+class ServerKafkaLoggingEventHandlerSuiteForKafka2 extends ServerKafkaLoggingEventHandlerSuite {
+ // equivalent to Apache Kafka 2.8.x
+ override val imageTag = "6.2.10"
+}
+
+class ServerKafkaLoggingEventHandlerSuiteForKafka3 extends ServerKafkaLoggingEventHandlerSuite {
+ // equivalent to Apache Kafka 3.3.x
+ override val imageTag = "7.3.3"
+}
diff --git a/pom.xml b/pom.xml
index af56eb167..2fe54e329 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
<jetty.version>9.4.50.v20221201</jetty.version>
<jline.version>0.9.94</jline.version>
<junit.version>4.13.2</junit.version>
+ <kafka.version>3.4.0</kafka.version>
<kubernetes-client.version>6.4.1</kubernetes-client.version>
<kudu.version>1.15.0</kudu.version>
<ldapsdk.version>6.0.5</ldapsdk.version>
@@ -545,6 +546,12 @@
<version>${testcontainers-scala.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.dimafeng</groupId>
+ <artifactId>testcontainers-scala-kafka_${scala.binary.version}</artifactId>
+ <version>${testcontainers-scala.version}</version>
+ </dependency>
+
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
@@ -1261,6 +1268,13 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ <optional>true</optional>
+ </dependency>
+
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.binary.version}</artifactId>