You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by zh...@apache.org on 2022/06/26 05:19:04 UTC

[incubator-shenyu] branch master updated: [ISSUE #2917] Shenyu add logging-kafka plugin (#3609)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b42774ce [ISSUE #2917] Shenyu add logging-kafka plugin (#3609)
5b42774ce is described below

commit 5b42774ce3b3d59bbd662eda46679040f6965754
Author: qifanyyy <45...@users.noreply.github.com>
AuthorDate: Sun Jun 26 01:18:57 2022 -0400

    [ISSUE #2917] Shenyu add logging-kafka plugin (#3609)
    
    * [GSoC 2022] Shenyu add logging-kafka plugin #2917
    
    * Update pom.xml
    
    * Style change
    
    * Update pluginenum style change
    
    * Optimize the version kafka-client to <kafka-clients.version>
---
 pom.xml                                            |   1 +
 shenyu-bootstrap/pom.xml                           |   9 +
 .../org/apache/shenyu/common/enums/PluginEnum.java |   5 +
 .../src/main/release-docs/LICENSE                  |   1 +
 shenyu-plugin/shenyu-plugin-logging/pom.xml        |   1 +
 .../shenyu-plugin-logging-kafka/pom.xml            |  54 +++
 .../plugin/logging/kafka/AbstractLogCollector.java | 124 +++++++
 .../plugin/logging/kafka/DefaultLogCollector.java  |  42 +++
 .../shenyu/plugin/logging/kafka/LogCollector.java  |  38 ++
 .../plugin/logging/kafka/LogConsumeClient.java     |  35 ++
 .../plugin/logging/kafka/LoggingKafkaPlugin.java   |  93 +++++
 .../plugin/logging/kafka/body/BodyWriter.java      | 106 ++++++
 .../kafka/body/LoggingServerHttpRequest.java       |  64 ++++
 .../kafka/body/LoggingServerHttpResponse.java      | 253 +++++++++++++
 .../logging/kafka/config/LogCollectConfig.java     | 269 ++++++++++++++
 .../logging/kafka/constant/LoggingConstant.java    |  32 ++
 .../logging/kafka/entity/LZ4CompressData.java      |  69 ++++
 .../logging/kafka/entity/ShenyuRequestLog.java     | 407 +++++++++++++++++++++
 .../handler/LoggingKafkaPluginDataHandler.java     | 154 ++++++++
 .../logging/kafka/kafka/KafkaLogCollectClient.java | 147 ++++++++
 .../plugin/logging/kafka/sampler/CountSampler.java | 114 ++++++
 .../plugin/logging/kafka/sampler/Sampler.java      |  38 ++
 .../logging/kafka/utils/LogCollectConfigUtils.java | 172 +++++++++
 .../logging/kafka/utils/LogCollectUtils.java       |  61 +++
 .../logging/kafka/DefaultLogCollectorTest.java     |  58 +++
 .../logging/kafka/LoggingKafkaPluginTest.java      |  97 +++++
 .../plugin/logging/kafka/body/BodyWriterTest.java  |  70 ++++
 .../kafka/body/LoggingServerHttpResponseTest.java  | 162 ++++++++
 .../logging/kafka/config/LogCollectConfigTest.java |  92 +++++
 .../logging/kafka/entity/LZ4CompressDataTest.java  |  54 +++
 .../logging/kafka/entity/ShenyuRequestLogTest.java |  85 +++++
 .../handler/LoggingKafkaPluginDataHandlerTest.java | 118 ++++++
 .../kafka/kafka/KafkaLogCollectClientTest.java     |  78 ++++
 .../logging/kafka/sampler/CountSamplerTest.java    |  91 +++++
 .../kafka/utils/LogCollectConfigUtilsTest.java     | 127 +++++++
 .../logging/kafka/utils/LogCollectUtilsTest.java   |  65 ++++
 .../shenyu-spring-boot-starter-plugin/pom.xml      |   1 +
 .../pom.xml                                        |  32 +-
 .../kafka/LoggingKafkaPluginConfiguration.java     |  53 +++
 .../src/main/resources/META-INF/spring.factories   |  19 +
 .../src/main/resources/META-INF/spring.provides    |  18 +
 .../kafka/LoggingKafkaPluginConfigurationTest.java |  65 ++++
 42 files changed, 3563 insertions(+), 11 deletions(-)

diff --git a/pom.xml b/pom.xml
index dc085c110..1f48df31f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,7 @@
         <jackson-databind.version>2.12.3</jackson-databind.version>
         <jakarta.json-api.version>2.0.1</jakarta.json-api.version>
         <elasticsearch-rest-client.version>8.2.3</elasticsearch-rest-client.version>
+        <kafka-clients.version>3.2.0</kafka-clients.version>
         <!--maven plugin version-->
         <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
         <jacoco-maven-plugin.version>0.8.7</jacoco-maven-plugin.version>
diff --git a/shenyu-bootstrap/pom.xml b/shenyu-bootstrap/pom.xml
index eec17d6e2..b4885b76d 100644
--- a/shenyu-bootstrap/pom.xml
+++ b/shenyu-bootstrap/pom.xml
@@ -459,6 +459,14 @@
         </dependency>
         <!--shenyu logging-rocketmq plugin end-->
 
+        <!--shenyu logging-kafka plugin start-->
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-spring-boot-starter-plugin-logging-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!--shenyu logging-kafka plugin end-->
+      
         <!--shenyu logging-elasticsearch plugin start-->
         <dependency>
             <groupId>org.apache.shenyu</groupId>
@@ -466,6 +474,7 @@
             <version>${project.version}</version>
         </dependency>
         <!--shenyu logging-elasticsearch plugin end-->
+
     </dependencies>
     <profiles>
         <profile>
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
index ba618caee..afc218338 100644
--- a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
@@ -137,6 +137,11 @@ public enum PluginEnum {
      */
     LOGGING_ROCKETMQ(170, 0, "loggingRocketMQ"),
 
+    /**
+     * Logging RocketMQ plugin enum.
+     */
+    LOGGING_KAFKA(180, 0, "loggingKafka"),
+  
     /**
      * Logging ElasticSearch plugin enum.
      */
diff --git a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
index 1ed493d21..6408e08f4 100644
--- a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
+++ b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
@@ -392,6 +392,7 @@ The text of each license is the standard Apache 2.0 license.
     tomcat-annotations-api 9.0.29: https://tomcat.apache.org/, Apache 2.0
     tomcat-embed-core 9.0.29: https://tomcat.apache.org/, Apache 2.0
     jackson-jr-objects 2.10.1: http://wiki.fasterxml.com/JacksonHome, Apache 2.0
+    kafka-clients 3.2.0: https://kafka.apache.org/, Apache 2.0
     elasticsearch-java 8.2.3: https://github.com/elastic/elasticsearch-java, Apache 2.0
     elasticsearch-rest-client 8.2.3: https://github.com/elastic/elasticsearch, Apache 2.0
 
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-plugin/shenyu-plugin-logging/pom.xml
index beb8864ef..767ca0f17 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-plugin/shenyu-plugin-logging/pom.xml
@@ -31,6 +31,7 @@
     <modules>
         <module>shenyu-plugin-logging-console</module>
         <module>shenyu-plugin-logging-rocketmq</module>
+        <module>shenyu-plugin-logging-kafka</module>
         <module>shenyu-plugin-logging-elasticsearch</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/pom.xml b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/pom.xml
new file mode 100644
index 000000000..52f3c64d6
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.shenyu</groupId>
+        <artifactId>shenyu-plugin-logging</artifactId>
+        <version>2.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>shenyu-plugin-logging-kafka</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-plugin-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka-clients.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+            <version>${lz4-java.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/AbstractLogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/AbstractLogCollector.java
new file mode 100644
index 000000000..c00e177f3
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/AbstractLogCollector.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.shenyu.common.concurrent.MemorySafeTaskQueue;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
+import org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor;
+import org.apache.shenyu.common.config.ShenyuConfig;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.utils.ThreadUtils;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * abstract log collector,Contains common methods.
+ */
+public abstract class AbstractLogCollector implements LogCollector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractLogCollector.class);
+
+    private final AtomicBoolean started = new AtomicBoolean(true);
+
+    private final ShenyuConfig config = new ShenyuConfig();
+
+    private int bufferSize;
+
+    private BlockingQueue<ShenyuRequestLog> bufferQueue;
+
+    private long lastPushTime;
+
+    @Override
+    public void start() {
+        bufferSize = LogCollectConfigUtils.getGlobalLogConfig().getBufferQueueSize();
+        bufferQueue = new LinkedBlockingDeque<>(bufferSize);
+        final ShenyuConfig.SharedPool sharedPool = config.getSharedPool();
+        ShenyuThreadPoolExecutor threadExecutor = new ShenyuThreadPoolExecutor(sharedPool.getCorePoolSize(),
+            sharedPool.getMaximumPoolSize(), sharedPool.getKeepAliveTime(), TimeUnit.MILLISECONDS,
+            new MemorySafeTaskQueue<>(Constants.THE_256_MB),
+            ShenyuThreadFactory.create(config.getSharedPool().getPrefix(), true),
+            new ThreadPoolExecutor.AbortPolicy());
+        started.set(true);
+        threadExecutor.execute(this::consume);
+    }
+
+    @Override
+    public void collect(final ShenyuRequestLog log) {
+        if (Objects.isNull(log) || Objects.isNull(getLogConsumeClient())) {
+            return;
+        }
+        if (bufferQueue.size() < bufferSize) {
+            bufferQueue.add(log);
+        }
+    }
+
+    /**
+     * batch and async consume.
+     */
+    private void consume() {
+        while (started.get()) {
+            int diffTimeMSForPush = 100;
+            try {
+                List<ShenyuRequestLog> logs = new ArrayList<>();
+                int size = bufferQueue.size();
+                long time = System.currentTimeMillis();
+                long timeDiffMs = time - lastPushTime;
+                int batchSize = 100;
+                if (size >= batchSize || timeDiffMs > diffTimeMSForPush) {
+                    bufferQueue.drainTo(logs, batchSize);
+                    LogConsumeClient logCollectClient = getLogConsumeClient();
+                    if (logCollectClient != null) {
+                        logCollectClient.consume(logs);
+                    }
+                    lastPushTime = time;
+                } else {
+                    ThreadUtils.sleep(TimeUnit.MILLISECONDS, diffTimeMSForPush);
+                }
+            } catch (Exception e) {
+                LOG.error("DefaultLogCollector collect log error", e);
+                ThreadUtils.sleep(TimeUnit.MILLISECONDS, diffTimeMSForPush);
+            }
+        }
+    }
+
+    /**
+     * get log consume client.
+     *
+     * @return log consume client
+     */
+    protected abstract LogConsumeClient getLogConsumeClient();
+
+    @Override
+    public void close() throws Exception {
+        started.set(false);
+        LogConsumeClient logCollectClient = getLogConsumeClient();
+        if (logCollectClient != null) {
+            logCollectClient.close();
+        }
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollector.java
new file mode 100644
index 000000000..29d3d4cab
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import org.apache.shenyu.plugin.logging.kafka.handler.LoggingKafkaPluginDataHandler;
+
+/**
+ * default log collector,depend a LogConsumeClient for consume logs.
+ */
+public class DefaultLogCollector extends AbstractLogCollector {
+
+    private static final LogCollector INSTANCE = new DefaultLogCollector();
+
+    /**
+     * get LogCollector instance.
+     *
+     * @return LogCollector instance
+     */
+    public static LogCollector getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    protected LogConsumeClient getLogConsumeClient() {
+        return LoggingKafkaPluginDataHandler.getKafkaLogCollectClient();
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogCollector.java
new file mode 100644
index 000000000..f4397f528
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogCollector.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+
+/**
+ * Collect logs and put into buffer queue.
+ */
+public interface LogCollector extends AutoCloseable {
+
+    /**
+     * start log collector.
+     */
+    void start();
+
+    /**
+     * collect log.
+     *
+     * @param log access log
+     */
+    void collect(ShenyuRequestLog log);
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogConsumeClient.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogConsumeClient.java
new file mode 100644
index 000000000..5d0abc0b7
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogConsumeClient.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.util.List;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+
+/**
+ * Used to collect logs, which can be stored in remote or local files or databases, or others.
+ */
+public interface LogConsumeClient extends AutoCloseable {
+
+    /**
+     * collect logs.
+     *
+     * @param logs list of log
+     * @throws Exception produce exception
+     */
+    void consume(List<ShenyuRequestLog> logs) throws Exception;
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
new file mode 100644
index 000000000..4c3e4ca95
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
+import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.apache.shenyu.plugin.logging.kafka.body.LoggingServerHttpRequest;
+import org.apache.shenyu.plugin.logging.kafka.body.LoggingServerHttpResponse;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+
+import static org.apache.shenyu.common.enums.PluginEnum.LOGGING_KAFKA;
+
+/**
+ * Integrated kafka collect log.
+ */
+public class LoggingKafkaPlugin extends AbstractShenyuPlugin {
+
+    private static final String USER_AGENT = "User-Agent";
+
+    private static final String HOST = "Host";
+
+    @Override
+    protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain,
+                                   final SelectorData selector, final RuleData rule) {
+        ServerHttpRequest request = exchange.getRequest();
+        // control sampling
+        if (!LogCollectConfigUtils.isSampled(exchange.getRequest())) {
+            return chain.execute(exchange);
+        }
+
+        ShenyuRequestLog requestInfo = new ShenyuRequestLog();
+        requestInfo.setRequestUri(request.getURI().toString());
+        requestInfo.setMethod(request.getMethodValue());
+        requestInfo.setRequestHeader(LogCollectUtils.getHeaders(request.getHeaders()));
+        requestInfo.setQueryParams(request.getURI().getQuery());
+        requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
+        requestInfo.setUserAgent(request.getHeaders().getFirst(USER_AGENT));
+        requestInfo.setHost(request.getHeaders().getFirst(HOST));
+        requestInfo.setPath(request.getURI().getPath());
+
+        LoggingServerHttpRequest loggingServerHttpRequest = new LoggingServerHttpRequest(request, requestInfo);
+        LoggingServerHttpResponse loggingServerHttpResponse = new LoggingServerHttpResponse(exchange.getResponse(),
+            requestInfo, DefaultLogCollector.getInstance());
+        ServerWebExchange webExchange = exchange.mutate().request(loggingServerHttpRequest)
+            .response(loggingServerHttpResponse).build();
+        loggingServerHttpResponse.setExchange(webExchange);
+
+        return chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    }
+
+    /**
+     * get plugin order.
+     *
+     * @return order
+     */
+    @Override
+    public int getOrder() {
+        return LOGGING_KAFKA.getCode();
+    }
+
+    /**
+     * get plugin name.
+     *
+     * @return plugin name
+     */
+    @Override
+    public String named() {
+        return LOGGING_KAFKA.getName();
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriter.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriter.java
new file mode 100644
index 000000000..e0660038f
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * bodyWriter is used to read Body.
+ */
+public class BodyWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BodyWriter.class);
+
+    private final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+    private final WritableByteChannel channel = Channels.newChannel(stream);
+
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+
+    /**
+     * write ByteBuffer.
+     *
+     * @param buffer byte buffer
+     */
+    public void write(final ByteBuffer buffer) {
+        if (!isClosed.get()) {
+            try {
+                channel.write(buffer);
+            } catch (IOException e) {
+                isClosed.compareAndSet(false, true);
+                LOG.error("write buffer Failed.", e);
+            }
+        }
+    }
+
+    /**
+     * judge stream is empty.
+     *
+     * @return true: stream is empty
+     */
+    boolean isEmpty() {
+        return stream.size() == 0;
+    }
+
+    /**
+     * get stream size.
+     *
+     * @return size of stream
+     */
+    public int size() {
+        return stream.size();
+    }
+
+    /**
+     * output stream value.
+     *
+     * @return string of stream
+     */
+    public String output() {
+        if (isEmpty()) {
+            return "";
+        }
+        try {
+            isClosed.compareAndSet(false, true);
+            return new String(stream.toByteArray(), StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            LOG.error("Write failed: ", e);
+            return "Write failed: " + e.getMessage();
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                LOG.error("Close stream error: ", e);
+            }
+            try {
+                channel.close();
+            } catch (IOException e) {
+                LOG.error("Close channel error: ", e);
+            }
+        }
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpRequest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpRequest.java
new file mode 100644
index 000000000..a19f02125
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
+import reactor.core.publisher.Flux;
+import reactor.util.annotation.NonNull;
+
+/**
+ * decorate ServerHttpRequest for read body.
+ */
+public class LoggingServerHttpRequest extends ServerHttpRequestDecorator {
+
+    private final ShenyuRequestLog logInfo;
+
+    public LoggingServerHttpRequest(final ServerHttpRequest delegate, final ShenyuRequestLog logInfo) {
+        super(delegate);
+        this.logInfo = logInfo;
+    }
+
+    /**
+     * get request body.
+     *
+     * @return Flux
+     */
+    @Override
+    @NonNull
+    public Flux<DataBuffer> getBody() {
+        BodyWriter writer = new BodyWriter();
+        return super.getBody().doOnNext(dataBuffer -> {
+            if (LogCollectUtils.isNotBinaryType(getHeaders())) {
+                writer.write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
+            }
+        }).doFinally(signal -> {
+            int size = writer.size();
+            String body = writer.output();
+            boolean requestBodyTooLarge = LogCollectConfigUtils.isRequestBodyTooLarge(size);
+            if (size == 0 || requestBodyTooLarge) {
+                return;
+            }
+            logInfo.setRequestBody(body);
+        });
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponse.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponse.java
new file mode 100644
index 000000000..1cbe9a085
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponse.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.common.utils.DateUtils;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.result.ShenyuResult;
+import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
+import org.apache.shenyu.plugin.logging.kafka.DefaultLogCollector;
+import org.apache.shenyu.plugin.logging.kafka.LogCollector;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
+import org.springframework.web.server.ResponseStatusException;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.annotation.NonNull;
+
+/**
+ * decorate ServerHttpResponse for read body.
+ */
+public class LoggingServerHttpResponse extends ServerHttpResponseDecorator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LoggingServerHttpResponse.class);
+
+    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+    private final ShenyuRequestLog logInfo;
+
+    private final LogCollector logCollector;
+
+    private ServerWebExchange exchange;
+
+    /**
+     * Constructor LoggingServerHttpResponse.
+     *
+     * @param delegate     delegate ServerHttpResponse
+     * @param logInfo      access log
+     * @param logCollector LogCollector  instance
+     */
+    public LoggingServerHttpResponse(final ServerHttpResponse delegate, final ShenyuRequestLog logInfo,
+                                     final LogCollector logCollector) {
+        super(delegate);
+        this.logInfo = logInfo;
+        this.logCollector = logCollector;
+    }
+
+    /**
+     * set relevant ServerWebExchange.
+     *
+     * @param exchange ServerWebExchange
+     */
+    public void setExchange(final ServerWebExchange exchange) {
+        this.exchange = exchange;
+    }
+
+    /**
+     * write with a publisher.
+     *
+     * @param body response body
+     * @return Mono
+     */
+    @Override
+    @NonNull
+    public Mono<Void> writeWith(@NonNull final Publisher<? extends DataBuffer> body) {
+        return super.writeWith(appendResponse(body));
+    }
+
+    /**
+     * append response.
+     *
+     * @param body publisher
+     * @return wrap Flux
+     */
+    @NonNull
+    private Flux<? extends DataBuffer> appendResponse(final Publisher<? extends DataBuffer> body) {
+        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+        assert shenyuContext != null;
+        if (getStatusCode() != null) {
+            logInfo.setStatus(getStatusCode().value());
+        }
+        logInfo.setResponseHeader(LogCollectUtils.getHeaders(getHeaders()));
+        BodyWriter writer = new BodyWriter();
+        logInfo.setTraceId(getTraceId());
+        return Flux.from(body).doOnNext(buffer -> {
+            if (LogCollectUtils.isNotBinaryType(getHeaders())) {
+                writer.write(buffer.asByteBuffer().asReadOnlyBuffer());
+            }
+        }).doFinally(signal -> logResponse(shenyuContext, writer));
+    }
+
+    /**
+     * record response log.
+     *
+     * @param shenyuContext request context
+     * @param writer        bodyWriter
+     */
+    private void logResponse(final ShenyuContext shenyuContext, final BodyWriter writer) {
+        if (StringUtils.isNotBlank(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH))) {
+            String size = StringUtils.defaultIfEmpty(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH), "0");
+            logInfo.setResponseContentLength(Integer.parseInt(size));
+        } else {
+            logInfo.setResponseContentLength(writer.size());
+        }
+        logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
+        logInfo.setModule(shenyuContext.getModule());
+        long costTime = DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(), LocalDateTime.now());
+        logInfo.setUpstreamResponseTime(costTime);
+        logInfo.setMethod(shenyuContext.getMethod());
+        logInfo.setRpcType(shenyuContext.getRpcType());
+        if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
+            logInfo.setUpstreamIp(getUpstreamIp());
+        }
+        int size = writer.size();
+        String body = writer.output();
+        if (size > 0 && !LogCollectConfigUtils.isResponseBodyTooLarge(size)) {
+            logInfo.setResponseBody(body);
+        }
+        // collect log
+        if (logCollector != null) {
+            logCollector.collect(logInfo);
+        }
+    }
+
+    /**
+     * get upstream ip.
+     *
+     * @return upstream ip
+     */
+    private String getUpstreamIp() {
+        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+        assert shenyuContext != null;
+        if (RpcTypeEnum.HTTP.getName().equals(shenyuContext.getRpcType())) {
+            URI uri = exchange.getAttribute(Constants.HTTP_URI);
+            if (uri != null) {
+                return uri.getHost();
+            } else {
+                return getUpstreamIpFromHttpDomain();
+            }
+        } else {
+            String domain = (String) exchange.getAttributes().get(Constants.HTTP_DOMAIN);
+            if (StringUtils.isNotBlank(domain)) {
+                return getUpstreamIpFromHttpDomain();
+            }
+            // The current context is difficult to obtain the upstream IP of grpc and Dubbo. need change plugin code.
+        }
+        return "";
+    }
+
+    /**
+     * Encourage developers to provide plugins to upstream like SkyWalking, ZipKin and OpenTelemetry
+     * to implement the tracing features. These plug-ins can set a traceId to the context.
+     *
+     * @return traceId
+     */
+    private String getTraceId() {
+        return (String) exchange.getAttributes().get(LoggingConstant.SHENYU_AGENT_TRACE_ID);
+    }
+
+    /**
+     * collect access error.
+     *
+     * @param throwable Exception occurred。
+     */
+    public void logError(final Throwable throwable) {
+        HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
+        if (throwable instanceof ResponseStatusException) {
+            httpStatus = ((ResponseStatusException) throwable).getStatus();
+        }
+        logInfo.setStatus(httpStatus.value());
+        logInfo.setTraceId(getTraceId());
+        // Do not collect stack
+        Object result = ShenyuResultWrap.error(exchange, httpStatus.value(),
+            httpStatus.getReasonPhrase(), throwable.getMessage());
+        final ShenyuResult<?> shenyuResult = ShenyuResultWrap.shenyuResult();
+        Object resultData = shenyuResult.format(exchange, result);
+        final Object responseData = shenyuResult.result(exchange, resultData);
+        assert null != responseData;
+        final byte[] bytes = (responseData instanceof byte[])
+            ? (byte[]) responseData
+            : responseData.toString().getBytes(StandardCharsets.UTF_8);
+        logInfo.setResponseContentLength(bytes.length);
+        ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+        assert shenyuContext != null;
+        logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
+        logInfo.setModule(shenyuContext.getModule());
+        long costTime = DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(), LocalDateTime.now());
+        logInfo.setUpstreamResponseTime(costTime);
+        logInfo.setResponseHeader(LogCollectUtils.getHeaders(exchange.getResponse().getHeaders()));
+        logInfo.setRpcType(shenyuContext.getRpcType());
+        logInfo.setMethod(shenyuContext.getMethod());
+        if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
+            logInfo.setUpstreamIp(getUpstreamIp());
+        }
+
+        int size = bytes.length;
+        String body = new String(bytes, StandardCharsets.UTF_8);
+        if (size > 0 && !LogCollectConfigUtils.isResponseBodyTooLarge(size)) {
+            logInfo.setResponseBody(body);
+        }
+        // collect log
+        if (DefaultLogCollector.getInstance() != null) {
+            DefaultLogCollector.getInstance().collect(logInfo);
+        }
+    }
+
+    private String getUpstreamIpFromHttpDomain() {
+        String domain = (String) exchange.getAttributes().get(Constants.HTTP_DOMAIN);
+        try {
+            if (StringUtils.isNotBlank(domain)) {
+                URL url = new URL(domain);
+                return url.getHost();
+            }
+        } catch (Exception e) {
+            LOG.error("get upstream ip error");
+        }
+        return "";
+    }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfig.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfig.java
new file mode 100644
index 000000000..12d1b3350
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfig.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.config;
+
+/**
+ * log collect config, include kafka config.
+ * Topic and nameserver must be included, and others are optional.
+ * We should operate the configuration through admin instead of the configuration file.
+ */
+public class LogCollectConfig {
+
+    private GlobalLogConfig globalLogConfig;
+
+    /**
+     * get global log config.
+     *
+     * @return global log config
+     */
+    public GlobalLogConfig getGlobalLogConfig() {
+        return globalLogConfig;
+    }
+
+    /**
+     * set global log config.
+     *
+     * @param globalLogConfig global log config.
+     */
+    public void setGlobalLogConfig(final GlobalLogConfig globalLogConfig) {
+        this.globalLogConfig = globalLogConfig;
+    }
+
+    /**
+     * global log config.
+     */
+    public static class GlobalLogConfig {
+        private String topic;
+
+        private String namesrvAddr;
+
+        private String producerGroup;
+
+        private String sampleRate = "1";
+
+        private String compressAlg;
+
+        /**
+         * default 512KB.
+         */
+        private int maxResponseBody = 524288;
+
+        /**
+         * default 512kb.
+         */
+        private int maxRequestBody = 524288;
+
+        private int bufferQueueSize = 50000;
+
+        /**
+         * get sample rate.
+         *
+         * @return sample
+         */
+        public String getSampleRate() {
+            return sampleRate;
+        }
+
+        /**
+         * set sample rate.
+         *
+         * @param sampleRate rate
+         */
+        public void setSampleRate(final String sampleRate) {
+            this.sampleRate = sampleRate;
+        }
+
+        /**
+         * whether compress.
+         *
+         * @return compress or not
+         */
+        public String getCompressAlg() {
+            return compressAlg;
+        }
+
+        /**
+         * set compress.
+         *
+         * @param compressAlg compress alg.
+         */
+        public void setCompressAlg(final String compressAlg) {
+            this.compressAlg = compressAlg;
+        }
+
+        /**
+         * get message queue topic.
+         *
+         * @return message queue topic
+         */
+        public String getTopic() {
+            return topic;
+        }
+
+        /**
+         * topic,used for message queue.
+         *
+         * @param topic mq topic
+         */
+        public void setTopic(final String topic) {
+            this.topic = topic;
+        }
+
+        /**
+         * get max response body.
+         *
+         * @return get max response body
+         */
+        public int getMaxResponseBody() {
+            return maxResponseBody;
+        }
+
+        /**
+         * set max response body.
+         *
+         * @param maxResponseBody max response body
+         */
+        public void setMaxResponseBody(final int maxResponseBody) {
+            this.maxResponseBody = maxResponseBody;
+        }
+
+        /**
+         * get max request body.
+         *
+         * @return max request body
+         */
+        public int getMaxRequestBody() {
+            return maxRequestBody;
+        }
+
+        /**
+         * set max request body.
+         *
+         * @param maxRequestBody max request body
+         */
+        public void setMaxRequestBody(final int maxRequestBody) {
+            this.maxRequestBody = maxRequestBody;
+        }
+
+        /**
+         * get buffer queue size.
+         *
+         * @return buffer queue size
+         */
+        public int getBufferQueueSize() {
+            return bufferQueueSize;
+        }
+
+        /**
+         * set buffer queue size.
+         *
+         * @param bufferQueueSize buffer queue size
+         */
+        public void setBufferQueueSize(final int bufferQueueSize) {
+            this.bufferQueueSize = bufferQueueSize;
+        }
+
+        /**
+         * get kafka nameserver address.
+         *
+         * @return kafka nameserver address
+         */
+        public String getNamesrvAddr() {
+            return namesrvAddr;
+        }
+
+        /**
+         * set kafka nameserver address.
+         * @param namesrvAddr kafka nameserver address
+         */
+        public void setNamesrvAddr(final String namesrvAddr) {
+            this.namesrvAddr = namesrvAddr;
+        }
+
+        /**
+         * get producer group.
+         *
+         * @return producer group
+         */
+        public String getProducerGroup() {
+            return producerGroup;
+        }
+
+        /**
+         * set producer group.
+         *
+         * @param producerGroup producer group
+         */
+        public void setProducerGroup(final String producerGroup) {
+            this.producerGroup = producerGroup;
+        }
+    }
+
+    /**
+     * api log config.
+     */
+    public static class LogApiConfig {
+
+        /**
+         * 0 means never sample, 1 means always sample. Minimum probability is 0.01, or 1% of logging
+         */
+        private String sampleRate;
+
+        /**
+         * This topic is useful if you use message queuing to collect logs.
+         */
+        private String topic;
+
+        /**
+         * get sample rate.
+         *
+         * @return sample rate
+         */
+        public String getSampleRate() {
+            return sampleRate;
+        }
+
+        /**
+         * set sample rate.
+         *
+         * @param sampleRate sample rate
+         */
+        public void setSampleRate(final String sampleRate) {
+            this.sampleRate = sampleRate;
+        }
+
+        /**
+         * get mq topic.
+         *
+         * @return mq topic
+         */
+        public String getTopic() {
+            return topic;
+        }
+
+        /**
+         * set  mq topic.
+         *
+         * @param topic mq topic
+         */
+        public void setTopic(final String topic) {
+            this.topic = topic;
+        }
+    }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/constant/LoggingConstant.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/constant/LoggingConstant.java
new file mode 100644
index 000000000..1083aaa81
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/constant/LoggingConstant.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.constant;
+
+/**
+ * Some log related property constants.
+ */
+public final class LoggingConstant {
+
+    public static final String TOPIC = "topic";
+
+    public static final String NAMESERVER_ADDRESS = "namesrvAddr";
+
+    public static final String SHENYU_AGENT_TRACE_ID = "shenyu-agent-trace-id";
+
+}
+
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressData.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressData.java
new file mode 100644
index 000000000..e6c4ee6a8
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+/**
+ * Lz4 compressed data.
+ */
+public class LZ4CompressData {
+
+    private int length;
+
+    private byte[] compressedData;
+
+    public LZ4CompressData(final int length, final byte[] compressedData) {
+        this.length = length;
+        this.compressedData = compressedData;
+    }
+
+    /**
+     * get the exact size of the original input.
+     *
+     * @return the exact size of the original input
+     */
+    public int getLength() {
+        return length;
+    }
+
+    /**
+     * set the exact size of the original input.
+     *
+     * @param length the exact size of the original input.
+     */
+    public void setLength(final int length) {
+        this.length = length;
+    }
+
+    /**
+     * get the compressed data.
+     *
+     * @return the compressed data
+     */
+    public byte[] getCompressedData() {
+        return compressedData;
+    }
+
+    /**
+     * set the compressed data.
+     *
+     * @param compressedData the compressed data
+     */
+    public void setCompressedData(final byte[] compressedData) {
+        this.compressedData = compressedData;
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLog.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLog.java
new file mode 100644
index 000000000..ffd44afaa
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLog.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+/**
+ * shenyu gateway access log.
+ */
+public class ShenyuRequestLog {
+
+    private String clientIp;
+
+    private String timeLocal;
+
+    private String method;
+
+    private String requestHeader;
+
+    private String responseHeader;
+
+    private String queryParams;
+
+    private String requestBody;
+
+    private String requestUri;
+
+    private String responseBody;
+
+    private int responseContentLength;
+
+    private String rpcType;
+
+    private int status;
+
+    private String upstreamIp;
+
+    private long upstreamResponseTime;
+
+    private String userAgent;
+
+    private String host;
+
+    private String module;
+
+    private String traceId;
+
+    /**
+     * path.
+     */
+    private String path;
+
+    /**
+     * get module.
+     *
+     * @return module
+     */
+    public String getModule() {
+        return module;
+    }
+
+    /**
+     * set module.
+     *
+     * @param module module
+     */
+    public void setModule(final String module) {
+        this.module = module;
+    }
+
+    /**
+     * get responseContentLength.
+     *
+     * @return ResponseContentLength
+     */
+    public int getResponseContentLength() {
+        return responseContentLength;
+    }
+
+    /**
+     * set ResponseContentLength.
+     *
+     * @param responseContentLength ResponseContentLength
+     */
+    public void setResponseContentLength(final int responseContentLength) {
+        this.responseContentLength = responseContentLength;
+    }
+
+    /**
+     * get userAgent.
+     *
+     * @return userAgent
+     */
+    public String getUserAgent() {
+        return userAgent;
+    }
+
+    /**
+     * set userAgent.
+     *
+     * @param userAgent userAgent
+     */
+    public void setUserAgent(final String userAgent) {
+        this.userAgent = userAgent;
+    }
+
+    /**
+     * get host.
+     *
+     * @return host
+     */
+    public String getHost() {
+        return host;
+    }
+
+    /**
+     * set host.
+     *
+     * @param host host
+     */
+    public void setHost(final String host) {
+        this.host = host;
+    }
+
+    /**
+     * get clientIp.
+     *
+     * @return clientIp
+     */
+    public String getClientIp() {
+        return clientIp;
+    }
+
+    /**
+     * set clientIp.
+     *
+     * @param clientIp clientIp
+     */
+    public void setClientIp(final String clientIp) {
+        this.clientIp = clientIp;
+    }
+
+    /**
+     * get timeLocal.
+     *
+     * @return timeLocal
+     */
+    public String getTimeLocal() {
+        return timeLocal;
+    }
+
+    /**
+     * set timeLocal.
+     *
+     * @param timeLocal timeLocal
+     */
+    public void setTimeLocal(final String timeLocal) {
+        this.timeLocal = timeLocal;
+    }
+
+    /**
+     * get method.
+     *
+     * @return method
+     */
+    public String getMethod() {
+        return method;
+    }
+
+    /**
+     * set method.
+     *
+     * @param method method
+     */
+    public void setMethod(final String method) {
+        this.method = method;
+    }
+
+    /**
+     * get requestHeader.
+     *
+     * @return requestHeader
+     */
+    public String getRequestHeader() {
+        return requestHeader;
+    }
+
+    /**
+     * set requestHeader.
+     *
+     * @param requestHeader requestHeader
+     */
+    public void setRequestHeader(final String requestHeader) {
+        this.requestHeader = requestHeader;
+    }
+
+    /**
+     * get responseHeader.
+     *
+     * @return responseHeader
+     */
+    public String getResponseHeader() {
+        return responseHeader;
+    }
+
+    /**
+     * set responseHeader.
+     *
+     * @param responseHeader responseHeader
+     */
+    public void setResponseHeader(final String responseHeader) {
+        this.responseHeader = responseHeader;
+    }
+
+    /**
+     * get queryParams.
+     *
+     * @return queryParams
+     */
+    public String getQueryParams() {
+        return queryParams;
+    }
+
+    /**
+     * set queryParams.
+     *
+     * @param queryParams queryParams
+     */
+    public void setQueryParams(final String queryParams) {
+        this.queryParams = queryParams;
+    }
+
+    /**
+     * get requestBody.
+     *
+     * @return requestBody
+     */
+    public String getRequestBody() {
+        return requestBody;
+    }
+
+    /**
+     * set requestBody.
+     *
+     * @param requestBody requestBody
+     */
+    public void setRequestBody(final String requestBody) {
+        this.requestBody = requestBody;
+    }
+
+    /**
+     * get requestUri.
+     *
+     * @return requestUri
+     */
+    public String getRequestUri() {
+        return requestUri;
+    }
+
+    /**
+     * set requestUri.
+     *
+     * @param requestUri requestUri
+     */
+    public void setRequestUri(final String requestUri) {
+        this.requestUri = requestUri;
+    }
+
+    /**
+     * get responseBody.
+     *
+     * @return responseBody
+     */
+    public String getResponseBody() {
+        return responseBody;
+    }
+
+    /**
+     * set responseBody.
+     *
+     * @param responseBody responseBody
+     */
+    public void setResponseBody(final String responseBody) {
+        this.responseBody = responseBody;
+    }
+
+    /**
+     * get rpcType.
+     *
+     * @return rpcType
+     */
+    public String getRpcType() {
+        return rpcType;
+    }
+
+    /**
+     * set rpcType.
+     *
+     * @param rpcType rpcType
+     */
+    public void setRpcType(final String rpcType) {
+        this.rpcType = rpcType;
+    }
+
+    /**
+     * set status.
+     *
+     * @return status
+     */
+    public int getStatus() {
+        return status;
+    }
+
+    /**
+     * set status.
+     *
+     * @param status status
+     */
+    public void setStatus(final int status) {
+        this.status = status;
+    }
+
+    /**
+     * get upstreamIp.
+     *
+     * @return upstreamIp
+     */
+    public String getUpstreamIp() {
+        return upstreamIp;
+    }
+
+    /**
+     * set upstreamIp.
+     *
+     * @param upstreamIp upstreamIp
+     */
+    public void setUpstreamIp(final String upstreamIp) {
+        this.upstreamIp = upstreamIp;
+    }
+
+    /**
+     * get upstreamResponseTime.
+     *
+     * @return upstreamResponseTime
+     */
+    public long getUpstreamResponseTime() {
+        return upstreamResponseTime;
+    }
+
+    /**
+     * set UpstreamResponseTime.
+     *
+     * @param upstreamResponseTime upstreamResponseTime
+     */
+    public void setUpstreamResponseTime(final long upstreamResponseTime) {
+        this.upstreamResponseTime = upstreamResponseTime;
+    }
+
+    /**
+     * get traceId.
+     *
+     * @return traceId
+     */
+    public String getTraceId() {
+        return traceId;
+    }
+
+    /**
+     * set traceId.
+     *
+     * @param traceId tracing id
+     */
+    public void setTraceId(final String traceId) {
+        this.traceId = traceId;
+    }
+
+    /**
+     * get request path.
+     *
+     * @return request path
+     */
+    public String getPath() {
+        return path;
+    }
+
+    /**
+     * request path.
+     *
+     * @param path request path
+     */
+    public void setPath(final String path) {
+        this.path = path;
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
new file mode 100644
index 000000000..49b9aa84e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.handler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.common.enums.SelectorTypeEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.kafka.DefaultLogCollector;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.kafka.KafkaLogCollectClient;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The type logging kafka plugin data handler.
+ */
+public class LoggingKafkaPluginDataHandler implements PluginDataHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LoggingKafkaPluginDataHandler.class);
+
+    private static final KafkaLogCollectClient KAFKA_LOG_COLLECT_CLIENT = new KafkaLogCollectClient();
+
+    private static final String EMPTY_JSON = "{}";
+
+    private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP = new ConcurrentHashMap<>();
+
+    private static final Map<String, LogCollectConfig.LogApiConfig> SELECT_API_CONFIG_MAP = new ConcurrentHashMap<>();
+
+    /**
+     * get kafka log collect client.
+     *
+     * @return kafka log collect client.
+     */
+    public static KafkaLogCollectClient getKafkaLogCollectClient() {
+        return KAFKA_LOG_COLLECT_CLIENT;
+    }
+
+    /**
+     * get selectId uriList map.
+     *
+     * @return selectId uriList map
+     */
+    public static Map<String, List<String>> getSelectIdUriListMap() {
+        return SELECT_ID_URI_LIST_MAP;
+    }
+
+    /**
+     * get select api config map.
+     *
+     * @return select api config map
+     */
+    public static Map<String, LogCollectConfig.LogApiConfig> getSelectApiConfigMap() {
+        return SELECT_API_CONFIG_MAP;
+    }
+
+    /**
+     * start or close kafka client.
+     */
+    @Override
+    public void handlerPlugin(final PluginData pluginData) {
+        LOG.info("handler loggingKafka Plugin data:{}", GsonUtils.getGson().toJson(pluginData));
+        if (pluginData.getEnabled()) {
+            LogCollectConfig.GlobalLogConfig globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
+                LogCollectConfig.GlobalLogConfig.class);
+
+            LogCollectConfigUtils.setGlobalConfig(globalLogConfig);
+            // start kafka producer
+            Properties properties = new Properties();
+            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            properties.put("bootstrap.servers", globalLogConfig.getNamesrvAddr());
+            properties.put(LoggingConstant.TOPIC, globalLogConfig.getTopic());
+            properties.put(LoggingConstant.NAMESERVER_ADDRESS, globalLogConfig.getTopic());
+            KAFKA_LOG_COLLECT_CLIENT.initProducer(properties);
+            DefaultLogCollector.getInstance().start();
+        } else {
+            try {
+                DefaultLogCollector.getInstance().close();
+            } catch (Exception e) {
+                LOG.error("close log collector error", e);
+            }
+        }
+    }
+
+    @Override
+    public void handlerSelector(final SelectorData selectorData) {
+        LOG.info("handler loggingKafka selector data:{}", GsonUtils.getGson().toJson(selectorData));
+        String handleJson = selectorData.getHandle();
+        if (StringUtils.isEmpty(handleJson) || EMPTY_JSON.equals(handleJson.trim())) {
+            return;
+        }
+        if (selectorData.getType() != SelectorTypeEnum.CUSTOM_FLOW.getCode()
+            || CollectionUtils.isEmpty(selectorData.getConditionList())) {
+            return;
+        }
+
+        LogCollectConfig.LogApiConfig logApiConfig = GsonUtils.getInstance().fromJson(handleJson,
+            LogCollectConfig.LogApiConfig.class);
+        if (StringUtils.isBlank(logApiConfig.getTopic()) || StringUtils.isBlank(logApiConfig.getSampleRate())) {
+            return;
+        }
+        List<String> uriList = new ArrayList<>();
+        for (ConditionData conditionData : selectorData.getConditionList()) {
+            if ("uri".equals(conditionData.getParamType()) && StringUtils.isNotBlank(conditionData.getParamValue())
+                && ("match".equals(conditionData.getOperator()) || "=".equals(conditionData.getOperator()))) {
+                uriList.add(conditionData.getParamValue().trim());
+            }
+        }
+        SELECT_ID_URI_LIST_MAP.put(selectorData.getId(), uriList);
+        SELECT_API_CONFIG_MAP.put(selectorData.getId(), logApiConfig);
+    }
+
+    @Override
+    public void removeSelector(final SelectorData selectorData) {
+        LOG.info("handler remove loggingKafka selector data:{}", GsonUtils.getGson().toJson(selectorData));
+        SELECT_ID_URI_LIST_MAP.remove(selectorData.getId());
+        SELECT_API_CONFIG_MAP.remove(selectorData.getId());
+    }
+
+    @Override
+    public String pluginNamed() {
+        return PluginEnum.LOGGING_KAFKA.getName();
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClient.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClient.java
new file mode 100644
index 000000000..2464cd1c9
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClient.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.apache.shenyu.plugin.logging.kafka.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.LZ4CompressData;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * queue-based logging collector.
+ */
+public class KafkaLogCollectClient implements LogConsumeClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaLogCollectClient.class);
+
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    private KafkaProducer<String, String> producer;
+
+    private String topic;
+
+    /**
+     * init producer.
+     *
+     * @param props kafka props
+     */
+    public void initProducer(final Properties props) {
+        if (MapUtils.isEmpty(props)) {
+            LOG.error("kafka props is empty. failed init kafka producer");
+            return;
+        }
+        if (isStarted.get()) {
+            close();
+        }
+        String topic = props.getProperty(LoggingConstant.TOPIC);
+        String nameserverAddress = props.getProperty("bootstrap.servers");
+
+        if (StringUtils.isBlank(topic) || StringUtils.isBlank(nameserverAddress)) {
+            LOG.error("init kafkaLogCollectClient error, please check topic or nameserverAddress");
+            return;
+        }
+        this.topic = topic;
+
+        producer = new KafkaProducer<String, String>(props);
+
+        ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, "shenyu-access-logging");
+
+        try {
+            producer.send(record);
+            LOG.info("init kafkaLogCollectClient success");
+            isStarted.set(true);
+        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+            // We can't recover from these exceptions, so our only option is to close the producer and exit.
+            LOG.error("Init kafkaLogCollectClient error, We can't recover from these exceptions, so our only option is to close the producer and exit", e);
+            producer.close();
+        } catch (KafkaException e) {
+            // For all other exceptions, just abort the transaction and try again.
+            LOG.error(
+                "init kafkaLogCollectClient error,Exceptions other than ProducerFencedException or OutOfOrderSequenceException or AuthorizationException, just abort the transaction and try again", e);
+        }
+    }
+
+    /**
+     * store logs.
+     *
+     * @param logs list of log
+     */
+    @Override
+    public void consume(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+            return;
+        }
+        logs.forEach(log -> {
+            String logTopic = StringUtils.defaultIfBlank(LogCollectConfigUtils.getTopic(log.getPath()), topic);
+            try {
+                producer.send(toProducerRecord(logTopic, log));
+            } catch (Exception e) {
+                LOG.error("kafka push logs error", e);
+            }
+        });
+    }
+
+    private ProducerRecord<String, String> toProducerRecord(final String logTopic, final ShenyuRequestLog log) {
+        byte[] bytes = JsonUtils.toJson(log).getBytes(StandardCharsets.UTF_8);
+        String compressAlg = StringUtils.defaultIfBlank(LogCollectConfigUtils.getGlobalLogConfig().getCompressAlg(), "");
+        if ("LZ4".equalsIgnoreCase(compressAlg.trim())) {
+            LZ4CompressData lz4CompressData = new LZ4CompressData(bytes.length, compressedByte(bytes));
+            return new ProducerRecord<>(logTopic, JsonUtils.toJson(lz4CompressData));
+
+        } else {
+            return new ProducerRecord<>(logTopic, JsonUtils.toJson(log));
+        }
+    }
+
+    private byte[] compressedByte(final byte[] srcByte) {
+        LZ4Factory factory = LZ4Factory.fastestInstance();
+        LZ4Compressor compressor = factory.fastCompressor();
+        return compressor.compress(srcByte);
+    }
+
+
+    /**
+     * close producer.
+     */
+    @Override
+    public void close() {
+        if (producer != null && isStarted.get()) {
+            producer.close();
+            isStarted.set(false);
+        }
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSampler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSampler.java
new file mode 100644
index 000000000..3a42cd2ad
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSampler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.sampler;
+
+import java.util.BitSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+/**
+ * used for sample log.
+ * reference resources: http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+ */
+public class CountSampler implements Sampler {
+
+    private final AtomicInteger counter;
+
+    private final BitSet sampleDecisions;
+
+    /**
+     * Fills a bitset with decisions according to the supplied probability.
+     */
+    CountSampler(final float probability) {
+        counter = new AtomicInteger();
+        int percent = (int) (probability * 100.0f);
+        this.sampleDecisions = genRandomBitSet(100, percent);
+    }
+
+    /**
+     * create a sampler instance.
+     *
+     * @param probability probability
+     * @return sampler instance
+     */
+    public static Sampler create(final String probability) {
+        if (StringUtils.isBlank(probability)) {
+            return ALWAYS_SAMPLE;
+        }
+        if ("0".equals(probability)) {
+            return NEVER_SAMPLE;
+        }
+        if ("1".equals(probability) || "1.0".equals(probability) || "1.0.0".equals(probability)) {
+            return ALWAYS_SAMPLE;
+        }
+        float parseProbability = NumberUtils.toFloat(probability, 1);
+        if (parseProbability < 0.01f || parseProbability > 1) {
+            throw new IllegalArgumentException(
+                "probability should be between 0.01 and 1: was " + probability);
+        }
+        return new CountSampler(parseProbability);
+    }
+
+    /**
+     * loops over the pre-canned decisions, resetting to zero when it gets to the end.
+     */
+    @Override
+    public boolean isSampled(final ServerHttpRequest request) {
+        return sampleDecisions.get(mod(counter.getAndIncrement()));
+    }
+
+    /**
+     * Returns a non-negative mod.
+     */
+    private int mod(final int dividend) {
+        int result = dividend % 100;
+        return result >= 0 ? result : 100 + result;
+    }
+
+    /**
+     * gen random bitSet.
+     * reference resources: http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+     *
+     * @param size        bitmap size
+     * @param cardinality cardinality
+     * @return bitSet
+     */
+    private BitSet genRandomBitSet(final int size, final int cardinality) {
+        BitSet result = new BitSet(size);
+        int[] chosen = new int[cardinality];
+        int i;
+        for (i = 0; i < cardinality; ++i) {
+            chosen[i] = i;
+            result.set(i);
+        }
+        Random random = new Random();
+        for (; i < size; ++i) {
+            int j = random.nextInt(i + 1);
+            if (j < cardinality) {
+                result.clear(chosen[j]);
+                result.set(i);
+                chosen[j] = i;
+            }
+        }
+        return result;
+    }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/Sampler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/Sampler.java
new file mode 100644
index 000000000..d724b757c
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/Sampler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.sampler;
+
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+/**
+ * sampler interface.
+ */
+public interface Sampler {
+
+    Sampler ALWAYS_SAMPLE = request -> true;
+    Sampler NEVER_SAMPLE = request -> false;
+
+    /**
+     * judge a ServerHttpRequest should be sample.
+     *
+     * @param request request
+     * @return whether sample
+     */
+    boolean isSampled(ServerHttpRequest request);
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtils.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtils.java
new file mode 100644
index 000000000..b7f061323
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtils.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig;
+import org.apache.shenyu.plugin.logging.kafka.sampler.CountSampler;
+import org.apache.shenyu.plugin.logging.kafka.sampler.Sampler;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.util.AntPathMatcher;
+
+/**
+ * log collect config Utils.
+ */
+public final class LogCollectConfigUtils {
+
+    private static final AntPathMatcher MATCHER = new AntPathMatcher();
+
+    private static final LogCollectConfig.GlobalLogConfig DEFAULT_GLOBAL_LOG_CONFIG =
+        new LogCollectConfig.GlobalLogConfig();
+
+    private static LogCollectConfig.GlobalLogConfig globalLogConfig;
+
+    private static Map<String, Sampler> apiSamplerMap = new HashMap<>();
+
+    private static Map<String, String> apiTopicMap = new HashMap<>();
+
+    private static Sampler globalSampler = Sampler.ALWAYS_SAMPLE;
+
+    private LogCollectConfigUtils() {
+    }
+
+    /**
+     * set global config.
+     *
+     * @param config global config
+     */
+    public static void setGlobalConfig(final LogCollectConfig.GlobalLogConfig config) {
+        globalLogConfig = config;
+    }
+
+    /**
+     * set api sample.
+     *
+     * @param uriSampleMap api sample map
+     */
+    public static void setSampler(final Map<String, String> uriSampleMap) {
+        Map<String, Sampler> samplerMap = new HashMap<>();
+        uriSampleMap.forEach((path, sampler) -> {
+            if (StringUtils.isBlank(sampler)) {
+                samplerMap.put(path, globalSampler);
+            } else {
+                samplerMap.put(path, CountSampler.create(sampler));
+            }
+        });
+        apiSamplerMap = samplerMap;
+    }
+
+    /**
+     * set api topic map.
+     *
+     * @param uriTopicMap api topic map
+     */
+    public static void setTopic(final Map<String, String> uriTopicMap) {
+        apiTopicMap = uriTopicMap;
+    }
+
+    /**
+     * set global Sampler.
+     *
+     * @param sampler global sampler
+     */
+    public static void setGlobalSampler(final String sampler) {
+        if (StringUtils.isNotBlank(sampler)) {
+            try {
+                globalSampler = CountSampler.create(sampler);
+            } catch (Exception e) {
+                globalSampler = Sampler.ALWAYS_SAMPLE;
+            }
+        }
+    }
+
+    /**
+     * judge whether sample.
+     *
+     * @param request request
+     * @return whether sample
+     */
+    public static boolean isSampled(final ServerHttpRequest request) {
+        String path = request.getURI().getPath();
+        for (Map.Entry<String, Sampler> entry : apiSamplerMap.entrySet()) {
+            String pattern = entry.getKey();
+            if (MATCHER.match(pattern, path)) {
+                return entry.getValue().isSampled(request);
+            }
+        }
+        if (globalSampler != null) {
+            return globalSampler.isSampled(request);
+        }
+        return true;
+    }
+
+    /**
+     * judge whether request body too large.
+     *
+     * @param bodySize body size
+     * @return whether request body too large
+     */
+    public static boolean isRequestBodyTooLarge(final int bodySize) {
+        if (Objects.isNull(globalLogConfig)) {
+            return false;
+        }
+        return bodySize > globalLogConfig.getMaxRequestBody();
+    }
+
+    /**
+     * judge whether response body too large.
+     *
+     * @param bodySize body size.
+     * @return whether response body too large
+     */
+    public static boolean isResponseBodyTooLarge(final int bodySize) {
+        if (Objects.isNull(globalLogConfig)) {
+            return false;
+        }
+        return bodySize > globalLogConfig.getMaxResponseBody();
+    }
+
+    /**
+     * get global log config.
+     *
+     * @return global log config
+     */
+    public static LogCollectConfig.GlobalLogConfig getGlobalLogConfig() {
+        return Optional.ofNullable(globalLogConfig).orElse(DEFAULT_GLOBAL_LOG_CONFIG);
+    }
+
+    /**
+     * get message queue topic.
+     *
+     * @param path request path
+     * @return topic
+     */
+    public static String getTopic(final String path) {
+        for (Map.Entry<String, String> entry : apiTopicMap.entrySet()) {
+            String pattern = entry.getKey();
+            if (MATCHER.match(pattern, path)) {
+                return entry.getValue();
+            }
+        }
+        return "";
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtils.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtils.java
new file mode 100644
index 000000000..317661416
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.springframework.http.HttpHeaders;
+
+/**
+ * log collect utils.
+ */
+public class LogCollectUtils {
+
+    private static final List<String> BINARY_TYPE_LIST = Arrays.asList("image", "multipart", "cbor",
+        "octet-stream", "pdf", "javascript", "css", "html");
+
+    /**
+     * judge whether is binary type.
+     *
+     * @param headers request or response header
+     * @return whether binary type
+     */
+    public static boolean isNotBinaryType(final HttpHeaders headers) {
+        return Optional.ofNullable(headers).map(HttpHeaders::getContentType)
+            .map(contentType -> !BINARY_TYPE_LIST.contains(contentType.getType())
+                && !BINARY_TYPE_LIST.contains(contentType.getSubtype()))
+            .orElse(true);
+    }
+
+    /**
+     * get request header string.
+     *
+     * @param headers request headers
+     * @return header string
+     */
+    public static String getHeaders(final HttpHeaders headers) {
+        Map<String, String> map = headers.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
+        return JsonUtils.toJson(map);
+    }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollectorTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollectorTest.java
new file mode 100644
index 000000000..bbe08226a
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollectorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.lang.reflect.Field;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.kafka.KafkaLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For DefaultLogCollector.
+ */
+public class DefaultLogCollectorTest {
+
+    private ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+    @BeforeEach
+    public void setUp() {
+        shenyuRequestLog.setClientIp("0.0.0.0");
+        shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+    }
+
+    @Test
+    public void testAbstractLogCollector() throws Exception {
+        DefaultLogCollector.getInstance().start();
+        Field field1 = AbstractLogCollector.class.getDeclaredField("started");
+        field1.setAccessible(true);
+        Assertions.assertEquals(field1.get(DefaultLogCollector.getInstance()).toString(), "true");
+        DefaultLogCollector.getInstance().collect(shenyuRequestLog);
+        DefaultLogCollector.getInstance().close();
+        Field field2 = AbstractLogCollector.class.getDeclaredField("started");
+        field2.setAccessible(true);
+        Assertions.assertEquals(field2.get(DefaultLogCollector.getInstance()).toString(), "false");
+    }
+
+    @Test
+    public void testGetLogConsumeClient() {
+        LogConsumeClient logConsumeClient = new DefaultLogCollector().getLogConsumeClient();
+        Assertions.assertEquals(KafkaLogCollectClient.class, logConsumeClient.getClass());
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
new file mode 100644
index 000000000..280c77906
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.net.InetSocketAddress;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.RemoteAddressResolver;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+/**
+ * The Test Case For LoggingKafkaPlugin.
+ */
+@ExtendWith(MockitoExtension.class)
+public final class LoggingKafkaPluginTest {
+
+    private LoggingKafkaPlugin loggingKafkaPlugin;
+
+    private ServerWebExchange exchange;
+
+    private RuleData ruleData;
+
+    private ShenyuPluginChain chain;
+
+    private SelectorData selectorData;
+
+    @BeforeEach
+    public void setUp() {
+        this.loggingKafkaPlugin = new LoggingKafkaPlugin();
+        this.ruleData = Mockito.mock(RuleData.class);
+        this.chain = Mockito.mock(ShenyuPluginChain.class);
+        this.selectorData = Mockito.mock(SelectorData.class);
+        MockServerHttpRequest request = MockServerHttpRequest
+            .get("localhost")
+            .remoteAddress(new InetSocketAddress(8090))
+            .header("X-source", "mock test")
+            .queryParam("queryParam", "Hello,World")
+            .build();
+        ConfigurableApplicationContext context = Mockito.mock(ConfigurableApplicationContext.class);
+        SpringBeanUtils.getInstance().setApplicationContext(context);
+        RemoteAddressResolver remoteAddressResolver = new RemoteAddressResolver() {
+        };
+        Mockito.lenient().when(context.getBean(RemoteAddressResolver.class)).thenReturn(remoteAddressResolver);
+        this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+        ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+    }
+
+    @Test
+    public void testDoExecute() {
+        Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
+        Mono<Void> result = loggingKafkaPlugin.doExecute(exchange, chain, selectorData, ruleData);
+        StepVerifier.create(result).expectSubscription().verifyComplete();
+    }
+
+    @Test
+    public void testGetOrder() {
+        Assertions.assertEquals(loggingKafkaPlugin.getOrder(), PluginEnum.LOGGING_KAFKA.getCode());
+    }
+
+    @Test
+    public void testNamed() {
+        Assertions.assertEquals(loggingKafkaPlugin.named(), PluginEnum.LOGGING_KAFKA.getName());
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriterTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriterTest.java
new file mode 100644
index 000000000..d46634a0b
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For BodyWriter.
+ */
+public class BodyWriterTest {
+
+    private BodyWriter writer;
+
+    private String sendString;
+
+    private ByteBuffer byteBuffer;
+
+    @BeforeEach
+    public void setUp() throws UnsupportedEncodingException {
+        this.writer = new BodyWriter();
+        this.sendString = "hello, shenyu";
+        byteBuffer = ByteBuffer.wrap(sendString.getBytes("UTF-8"));
+    }
+
+    @Test
+    public void testWrite() throws UnsupportedEncodingException {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(sendString.getBytes("UTF-8"));
+        writer.write(byteBuffer.asReadOnlyBuffer());
+        String res = writer.output();
+        Assertions.assertEquals(res, "hello, shenyu");
+    }
+
+    @Test
+    public void testIsEmpty() {
+        Assertions.assertEquals(writer.isEmpty(), true);
+    }
+
+    @Test
+    public void testSize() {
+        writer.write(byteBuffer.asReadOnlyBuffer());
+        int size = writer.size();
+        Assertions.assertEquals(size, 13);
+    }
+
+    @Test
+    public void testOutput() {
+        writer.write(byteBuffer.asReadOnlyBuffer());
+        String res = writer.output();
+        Assertions.assertEquals(res, "hello, shenyu");
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponseTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponseTest.java
new file mode 100644
index 000000000..0414eafeb
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponseTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.RemoteAddressResolver;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.result.ShenyuResult;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.apache.shenyu.plugin.logging.kafka.DefaultLogCollector;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * The Test Case For LoggingServerHttpResponse.
+ */
+public class LoggingServerHttpResponseTest {
+
+    private ShenyuRequestLog requestInfo = new ShenyuRequestLog();
+
+    private ServerWebExchange exchange;
+
+    private LoggingServerHttpResponse loggingServerHttpResponse;
+
+    private ServerHttpRequest serverHttpRequest;
+
+    private String userAgent = "User-Agent";
+
+    private String host = "Host";
+
+    private LocalDateTime startDateTime = LocalDateTime.now();
+
+    @BeforeEach
+    public void setUp() {
+        MockServerHttpRequest request = MockServerHttpRequest
+            .post("localhost")
+            .remoteAddress(new InetSocketAddress(8090))
+            .header("X-source", "mock test")
+            .queryParam("queryParam", "Hello,World")
+            .body("hello");
+        ConfigurableApplicationContext context = Mockito.mock(ConfigurableApplicationContext.class);
+        SpringBeanUtils.getInstance().setApplicationContext(context);
+        RemoteAddressResolver remoteAddressResolver = new RemoteAddressResolver() {
+        };
+        ShenyuResult shenyuResult = new ShenyuResult() {
+        };
+        this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+        Mockito.lenient().when(context.getBean(RemoteAddressResolver.class)).thenReturn(remoteAddressResolver);
+        Mockito.lenient().when(context.getBean(ShenyuResult.class)).thenReturn(shenyuResult);
+        ShenyuContext shenyuContext = new ShenyuContext();
+        shenyuContext.setStartDateTime(startDateTime);
+        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+        exchange.getAttributes().put(LoggingConstant.SHENYU_AGENT_TRACE_ID, "shenyu-agent-trace-id");
+        exchange.getAttributes().put(Constants.HTTP_DOMAIN, "http://localhost:9195/http/order/path/123/name");
+        this.serverHttpRequest = exchange.getRequest();
+        requestInfo.setRequestUri(serverHttpRequest.getURI().toString());
+        requestInfo.setMethod(serverHttpRequest.getMethodValue());
+        requestInfo.setRequestHeader(LogCollectUtils.getHeaders(serverHttpRequest.getHeaders()));
+        requestInfo.setQueryParams(serverHttpRequest.getURI().getQuery());
+        requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
+        requestInfo.setUserAgent(serverHttpRequest.getHeaders().getFirst(userAgent));
+        requestInfo.setHost(serverHttpRequest.getHeaders().getFirst(host));
+        requestInfo.setPath(serverHttpRequest.getURI().getPath());
+        this.loggingServerHttpResponse = new LoggingServerHttpResponse(exchange.getResponse(), requestInfo, DefaultLogCollector.getInstance());
+    }
+
+    @Test
+    public void testSetExchange() throws NoSuchFieldException, IllegalAccessException {
+        loggingServerHttpResponse.setExchange(exchange);
+        Field field = loggingServerHttpResponse.getClass().getDeclaredField("exchange");
+        field.setAccessible(true);
+        Assertions.assertEquals(field.get(loggingServerHttpResponse), exchange);
+    }
+
+    @Test
+    public void testGetTraceId() throws Exception {
+        loggingServerHttpResponse.setExchange(exchange);
+        exchange.getResponse().getHeaders();
+        Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("getTraceId");
+        method.setAccessible(true);
+        String traceId = (String) method.invoke(loggingServerHttpResponse);
+        Assertions.assertEquals(traceId, "shenyu-agent-trace-id");
+    }
+
+    @Test
+    public void testGetUpstreamIp() throws Exception {
+        loggingServerHttpResponse.setExchange(exchange);
+        Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("getUpstreamIp");
+        method.setAccessible(true);
+        String upstreamIp = (String) method.invoke(loggingServerHttpResponse);
+        Assertions.assertEquals(upstreamIp, "localhost");
+    }
+
+    @Test
+    public void testGetUpstreamIpFromHttpDomain() throws Exception {
+        loggingServerHttpResponse.setExchange(exchange);
+        Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("getUpstreamIpFromHttpDomain");
+        method.setAccessible(true);
+        String upstreamIpFromHttpDomain = (String) method.invoke(loggingServerHttpResponse);
+        Assertions.assertEquals(upstreamIpFromHttpDomain, "localhost");
+    }
+
+    @Test
+    public void testLogError() throws NoSuchFieldException, IllegalAccessException {
+        loggingServerHttpResponse.setExchange(exchange);
+        Throwable throwable = new Throwable("error");
+        DefaultLogCollector.getInstance().start();
+        loggingServerHttpResponse.logError(throwable);
+        Field field = loggingServerHttpResponse.getClass().getDeclaredField("logInfo");
+        field.setAccessible(true);
+        ShenyuRequestLog shenyuRequestLog = (ShenyuRequestLog) field.get(loggingServerHttpResponse);
+        Assertions.assertEquals(shenyuRequestLog.getStatus(), 500);
+    }
+
+    @Test
+    public void testLogResponse() throws Exception {
+        DefaultLogCollector.getInstance().start();
+        loggingServerHttpResponse.setExchange(exchange);
+        BodyWriter writer = new BodyWriter();
+        String sendString = "hello, shenyu";
+        ByteBuffer byteBuffer = ByteBuffer.wrap(sendString.getBytes("UTF-8"));
+        writer.write(byteBuffer.asReadOnlyBuffer());
+        Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("logResponse", ShenyuContext.class, BodyWriter.class);
+        method.setAccessible(true);
+        method.invoke(loggingServerHttpResponse, exchange.getAttribute(Constants.CONTEXT), writer);
+        Field field = loggingServerHttpResponse.getClass().getDeclaredField("logInfo");
+        field.setAccessible(true);
+        ShenyuRequestLog log = (ShenyuRequestLog) field.get(loggingServerHttpResponse);
+        Assertions.assertEquals(log.getResponseBody(), "hello, shenyu");
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfigTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfigTest.java
new file mode 100644
index 000000000..4097a9875
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfigTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.config;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For LogCollectConfig.
+ */
+public class LogCollectConfigTest {
+
+    private LogCollectConfig logCollectConfig = new LogCollectConfig();
+
+    @Test
+    public void testSetLogApiConfigTopic() {
+        LogCollectConfig.LogApiConfig logApiConfig = new LogCollectConfig.LogApiConfig();
+        logApiConfig.setTopic("test");
+        Assertions.assertEquals(logApiConfig.getTopic(), "test");
+    }
+
+    @Test
+    public void testSetLogApiConfigSampleRate() {
+        LogCollectConfig.LogApiConfig logApiConfig = new LogCollectConfig.LogApiConfig();
+        logApiConfig.setSampleRate("1");
+        Assertions.assertEquals(logApiConfig.getSampleRate(), "1");
+    }
+
+    @Test
+    public void testGetGlobalLogConfigSampleRate() {
+        LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+        globalLogConfig.setSampleRate("1");
+        Assertions.assertEquals(globalLogConfig.getSampleRate(), "1");
+    }
+
+    @Test
+    public void testSetGlobalLogConfigTopic() {
+        LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+        globalLogConfig.setTopic("test");
+        Assertions.assertEquals(globalLogConfig.getTopic(), "test");
+    }
+
+    @Test
+    public void testSetGlobalLogConfigMaxResponseBody() {
+        LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+        globalLogConfig.setMaxResponseBody(5);
+        Assertions.assertEquals(globalLogConfig.getMaxResponseBody(), 5);
+    }
+
+    @Test
+    public void testSetGlobalLogConfigMaxRequestBody() {
+        LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+        globalLogConfig.setMaxRequestBody(5);
+        Assertions.assertEquals(globalLogConfig.getMaxRequestBody(), 5);
+    }
+
+    @Test
+    public void testSetGlobalLogConfigNamesrvAddr() {
+        LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+        globalLogConfig.setNamesrvAddr("test");
+        Assertions.assertEquals(globalLogConfig.getNamesrvAddr(), "test");
+    }
+
+    @Test
+    public void testSetGlobalLogConfigProducerGroup() {
+        LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+        globalLogConfig.setProducerGroup("test");
+        Assertions.assertEquals(globalLogConfig.getProducerGroup(), "test");
+    }
+
+    @Test
+    public void testGetGlobalLogConfig() {
+        LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+        logCollectConfig.setGlobalLogConfig(globalLogConfig);
+        Assertions.assertEquals(logCollectConfig.getGlobalLogConfig(), globalLogConfig);
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressDataTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressDataTest.java
new file mode 100644
index 000000000..4f7132d67
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressDataTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For LZ4CompressData.
+ */
+public class LZ4CompressDataTest {
+
+    private int length;
+
+    private byte[] compressedData;
+
+    private LZ4CompressData lz4CompressData;
+
+    @BeforeEach
+    public void setUp() {
+        this.length = 5;
+        this.compressedData = new byte[] {'h', 'e', 'l', 'l', 'o'};
+        this.lz4CompressData = new LZ4CompressData(length, compressedData);
+    }
+
+    @Test
+    public void testGetLength() {
+        lz4CompressData.setLength(6);
+        Assertions.assertEquals(lz4CompressData.getLength(), 6);
+    }
+
+    @Test
+    public void testGetCompressedData() {
+        byte[] bytes = new byte[] {'h', 'e', 'l', 'l', 'o', '!'};
+        lz4CompressData.setCompressedData(bytes);
+        Assertions.assertEquals(lz4CompressData.getCompressedData(), bytes);
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLogTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLogTest.java
new file mode 100644
index 000000000..dc512dee4
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLogTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+import java.time.LocalDateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For ShenyuRequestLog.
+ */
+public class ShenyuRequestLogTest {
+
+    private ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+    @Test
+    public void testGetModule() {
+        shenyuRequestLog.setModule("test");
+        Assertions.assertEquals(shenyuRequestLog.getModule(), "test");
+    }
+
+    @Test
+    public void testResponseContentLength() {
+        shenyuRequestLog.setResponseContentLength(5);
+        Assertions.assertEquals(shenyuRequestLog.getResponseContentLength(), 5);
+    }
+
+    @Test
+    public void testGetUserAgent() {
+        shenyuRequestLog.setUserAgent("test");
+        Assertions.assertEquals(shenyuRequestLog.getUserAgent(), "test");
+    }
+
+    @Test
+    public void testGetHost() {
+        shenyuRequestLog.setHost("test");
+        Assertions.assertEquals(shenyuRequestLog.getHost(), "test");
+    }
+
+    @Test
+    public void testGetClientIp() {
+        shenyuRequestLog.setClientIp("0.0.0.0");
+        Assertions.assertEquals(shenyuRequestLog.getClientIp(), "0.0.0.0");
+    }
+
+    @Test
+    public void testGetTimeLocal() {
+        LocalDateTime timeLocal = LocalDateTime.now();
+        shenyuRequestLog.setTimeLocal(timeLocal.toString());
+        Assertions.assertEquals(shenyuRequestLog.getTimeLocal(), timeLocal.toString());
+    }
+
+    @Test
+    public void testGetMethod() {
+        shenyuRequestLog.setMethod("test");
+        Assertions.assertEquals(shenyuRequestLog.getMethod(), "test");
+    }
+
+    @Test
+    public void testGetRequestBody() {
+        shenyuRequestLog.setRequestBody("hello");
+        Assertions.assertEquals(shenyuRequestLog.getRequestBody(), "hello");
+    }
+
+    @Test
+    public void testGetUpstreamIp() {
+        shenyuRequestLog.setUpstreamIp("0.0.0.0");
+        Assertions.assertEquals(shenyuRequestLog.getUpstreamIp(), "0.0.0.0");
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
new file mode 100644
index 000000000..e869a927f
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.handler;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.plugin.logging.kafka.kafka.KafkaLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For LoggingKafkaPluginDataHandler.
+ */
+public class LoggingKafkaPluginDataHandlerTest {
+
+    private LoggingKafkaPluginDataHandler loggingKafkaPluginDataHandler;
+
+    private SelectorData selectorData = new SelectorData();
+
+    private ConditionData conditionData = new ConditionData();
+
+    private PluginData pluginData = new PluginData();
+
+    @BeforeEach
+    public void setUp() {
+        this.loggingKafkaPluginDataHandler = new LoggingKafkaPluginDataHandler();
+        selectorData.setId("1");
+        selectorData.setType(1);
+        selectorData.setHandle("{\"topic\":\"test\", \"sampleRate\":\"1\"}");
+        conditionData.setParamName("id");
+        conditionData.setParamType("uri");
+        conditionData.setParamValue("11");
+        conditionData.setOperator("=");
+        List<ConditionData> list = new ArrayList<>();
+        list.add(conditionData);
+        selectorData.setConditionList(list);
+        pluginData.setEnabled(true);
+        pluginData.setConfig("{\"topic\":\"test\", \"namesrvAddr\":\"localhost:8082\"}");
+    }
+
+    @Test
+    public void testHandlerPlugin() throws NoSuchFieldException, IllegalAccessException {
+        loggingKafkaPluginDataHandler.handlerPlugin(pluginData);
+        Field field = loggingKafkaPluginDataHandler.getClass().getDeclaredField("KAFKA_LOG_COLLECT_CLIENT");
+        field.setAccessible(true);
+        Assertions.assertEquals(field.get(loggingKafkaPluginDataHandler).getClass(), KafkaLogCollectClient.class);
+    }
+
+    @Test
+    public void testHandlerSelector() throws NoSuchFieldException, IllegalAccessException {
+        loggingKafkaPluginDataHandler.handlerSelector(selectorData);
+        Field field1 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+        field1.setAccessible(true);
+        Field field2 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+        field2.setAccessible(true);
+        Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
+        Assertions.assertNotEquals(field2.get("1").toString(), "{}");
+    }
+
+    @Test
+    public void testRemoveSelector() throws NoSuchFieldException, IllegalAccessException {
+        loggingKafkaPluginDataHandler.handlerSelector(selectorData);
+        Field field1 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+        field1.setAccessible(true);
+        Field field2 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+        field2.setAccessible(true);
+        Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
+        Assertions.assertNotEquals(field2.get("1").toString(), "{}");
+        loggingKafkaPluginDataHandler.removeSelector(selectorData);
+        Field field3 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+        field3.setAccessible(true);
+        Field field4 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+        field4.setAccessible(true);
+        Assertions.assertEquals(field3.get("1").toString(), "{}");
+        Assertions.assertEquals(field4.get("1").toString(), "{}");
+    }
+
+    @Test
+    public void testPluginNamed() {
+        Assertions.assertEquals(loggingKafkaPluginDataHandler.pluginNamed(), "loggingKafka");
+    }
+
+    @Test
+    public void testGetKafkaLogCollectClient() {
+        Assertions.assertEquals(loggingKafkaPluginDataHandler.getKafkaLogCollectClient().getClass(), KafkaLogCollectClient.class);
+    }
+
+    @Test
+    public void testGetSelectIdUriListMap() {
+        Assertions.assertEquals(LoggingKafkaPluginDataHandler.getSelectIdUriListMap().getClass(), ConcurrentHashMap.class);
+    }
+
+    @Test
+    public void testGetSelectApiConfigMap() {
+        Assertions.assertEquals(LoggingKafkaPluginDataHandler.getSelectApiConfigMap().getClass(), ConcurrentHashMap.class);
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClientTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClientTest.java
new file mode 100644
index 000000000..4092282c5
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClientTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.kafka;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig.GlobalLogConfig;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For RocketMQLogCollectClient.
+ */
+public class KafkaLogCollectClientTest {
+
+    private final Properties props = new Properties();
+
+    private final PluginData pluginData = new PluginData();
+
+    private final List<ShenyuRequestLog> logs = new ArrayList<>();
+
+    private final ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+    private KafkaLogCollectClient kafkaLogCollectClient;
+
+    private GlobalLogConfig globalLogConfig;
+
+    @BeforeEach
+    public void setUp() {
+        this.kafkaLogCollectClient = new KafkaLogCollectClient();
+        pluginData.setEnabled(true);
+        pluginData.setConfig("{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\":\"localhost:8082\"}");
+        globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
+            GlobalLogConfig.class);
+        globalLogConfig.setCompressAlg("LZ4");
+        props.put("bootstrap.servers", globalLogConfig.getNamesrvAddr());
+        props.put(LoggingConstant.NAMESERVER_ADDRESS, globalLogConfig.getNamesrvAddr());
+        props.setProperty(LoggingConstant.TOPIC, globalLogConfig.getTopic());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        shenyuRequestLog.setClientIp("0.0.0.0");
+        shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+        logs.add(shenyuRequestLog);
+    }
+
+    @Test
+    public void testInitProducer() throws NoSuchFieldException, IllegalAccessException {
+        kafkaLogCollectClient.initProducer(props);
+        Field field = kafkaLogCollectClient.getClass().getDeclaredField("topic");
+        field.setAccessible(true);
+        Assertions.assertEquals(field.get(kafkaLogCollectClient), "shenyu-access-logging");
+        kafkaLogCollectClient.close();
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSamplerTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSamplerTest.java
new file mode 100644
index 000000000..917252d07
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSamplerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.sampler;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.BitSet;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * The Test Case For CountSampler.
+ */
+public class CountSamplerTest {
+
+    private CountSampler countSampler;
+
+    private ServerHttpRequest request;
+
+    private ServerWebExchange exchange;
+
+    @BeforeEach
+    public void setUp() {
+        this.countSampler = new CountSampler(1);
+        MockServerHttpRequest request = MockServerHttpRequest
+            .get("localhost")
+            .remoteAddress(new InetSocketAddress(8090))
+            .header("X-source", "mock test")
+            .queryParam("queryParam", "Hello,World")
+            .build();
+        this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+        ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+        this.request = exchange.getRequest();
+    }
+
+    @Test
+    public void testIsSampled() {
+        Assertions.assertEquals(countSampler.isSampled(request), true);
+    }
+
+    @Test
+    public void testMod() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Method method = countSampler.getClass().getDeclaredMethod("mod", int.class);
+        method.setAccessible(true);
+        int res = (int) method.invoke(countSampler, 1);
+        Assertions.assertEquals(res, 1);
+    }
+
+    @Test
+    public void testGenRandomBitSet() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Method method = countSampler.getClass().getDeclaredMethod("genRandomBitSet", int.class, int.class);
+        method.setAccessible(true);
+        BitSet bitSet = (BitSet) method.invoke(countSampler, 1, 1);
+        BitSet res = new BitSet(1);
+        res.set(0);
+        Assertions.assertEquals(bitSet, res);
+    }
+
+    @Test
+    public void testCreate() {
+        Assertions.assertEquals(CountSampler.create(""), Sampler.ALWAYS_SAMPLE);
+        Assertions.assertEquals(CountSampler.create("0"), Sampler.NEVER_SAMPLE);
+        Assertions.assertEquals(CountSampler.create("1"), Sampler.ALWAYS_SAMPLE);
+        Assertions.assertEquals(CountSampler.create("0.5").getClass(), CountSampler.class);
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtilsTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtilsTest.java
new file mode 100644
index 000000000..33dac94a1
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig.GlobalLogConfig;
+import org.apache.shenyu.plugin.logging.kafka.sampler.Sampler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * The Test Case For LogCollectConfigUtils.
+ */
+
+public class LogCollectConfigUtilsTest {
+
+    private GlobalLogConfig config = new GlobalLogConfig();
+
+    private ServerWebExchange exchange;
+
+    private ServerHttpRequest request;
+
+    private Map<String, String> uriSampleMap = new HashMap<>();
+
+    private Map<String, String> apiTopicMap = new HashMap<>();
+
+    @BeforeEach
+    public void setUp() {
+        config.setBufferQueueSize(5000);
+        LogCollectConfigUtils.setGlobalConfig(config);
+        uriSampleMap.put("const", "1");
+        apiTopicMap.put("topic", "shenyu-access-logging");
+        MockServerHttpRequest request = MockServerHttpRequest
+            .get("localhost")
+            .remoteAddress(new InetSocketAddress(8090))
+            .header("X-source", "mock test")
+            .queryParam("queryParam", "Hello,World")
+            .build();
+        this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+        ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+        this.request = exchange.getRequest();
+    }
+
+    @Test
+    public void testGetGlobalConfig() {
+        GlobalLogConfig globalLogConfig = LogCollectConfigUtils.getGlobalLogConfig();
+        assertEquals(globalLogConfig.getClass(), GlobalLogConfig.class);
+    }
+
+    @Test
+    public void testSetGlobalConfig() {
+        assertEquals(LogCollectConfigUtils.getGlobalLogConfig().getBufferQueueSize(), 5000);
+    }
+
+    @Test
+    public void testSetSampler() throws IllegalAccessException, NoSuchFieldException {
+        LogCollectConfigUtils.setSampler(uriSampleMap);
+        Field field = LogCollectConfigUtils.class.getDeclaredField("apiSamplerMap");
+        field.setAccessible(true);
+        Assertions.assertEquals(field.get("const").toString(), "{const=" + Sampler.ALWAYS_SAMPLE + "}");
+    }
+
+    @Test
+    public void testIsSampled() {
+        assertEquals(LogCollectConfigUtils.isSampled(request), false);
+    }
+
+    @Test
+    public void testIsRequestBodyTooLarge() {
+        assertEquals(LogCollectConfigUtils.isRequestBodyTooLarge(524289), true);
+        assertEquals(LogCollectConfigUtils.isRequestBodyTooLarge(524288), false);
+    }
+
+    @Test
+    public void testIsResponseBodyTooLarge() {
+        assertEquals(LogCollectConfigUtils.isResponseBodyTooLarge(524289), true);
+        assertEquals(LogCollectConfigUtils.isResponseBodyTooLarge(524288), false);
+    }
+
+    @Test
+    public void testGetTopic() {
+        assertEquals(LogCollectConfigUtils.getTopic("topic"), "");
+    }
+
+    @Test
+    public void testSetTopic() {
+        LogCollectConfigUtils.setTopic(apiTopicMap);
+        assertEquals(LogCollectConfigUtils.getTopic("topic"), "shenyu-access-logging");
+    }
+
+    @Test
+    public void testSetGlobalSampler() throws NoSuchFieldException, IllegalAccessException {
+        LogCollectConfigUtils.setGlobalSampler("0");
+        Field field = LogCollectConfigUtils.class.getDeclaredField("globalSampler");
+        field.setAccessible(true);
+        assertEquals(field.get("const"), Sampler.NEVER_SAMPLE);
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtilsTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtilsTest.java
new file mode 100644
index 000000000..25482bd91
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.net.InetSocketAddress;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * The Test Case For LogCollectUtils.
+ */
+public class LogCollectUtilsTest {
+
+    private ServerWebExchange exchange;
+
+    private ServerHttpRequest request;
+
+    @BeforeEach
+    public void setUp() {
+        MockServerHttpRequest request = MockServerHttpRequest
+            .get("localhost")
+            .remoteAddress(new InetSocketAddress(8090))
+            .header("X-source", "mock test")
+            .queryParam("queryParam", "Hello,World")
+            .build();
+        this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+        ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+        this.request = exchange.getRequest();
+    }
+
+    @Test
+    public void testIsNotBinaryType() {
+        assertEquals(LogCollectUtils.isNotBinaryType(request.getHeaders()), true);
+    }
+
+    @Test
+    public void testGetHeaders() {
+        assertEquals(LogCollectUtils.getHeaders(request.getHeaders()), "{\"X-source\":\"mock test\"}");
+    }
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
index 3acd0a769..e1524a5a2 100644
--- a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
@@ -59,6 +59,7 @@
         <module>shenyu-spring-boot-starter-plugin-mqtt</module>
         <module>shenyu-spring-boot-starter-plugin-metrics</module>
         <module>shenyu-spring-boot-starter-plugin-logging-rocketmq</module>
+        <module>shenyu-spring-boot-starter-plugin-logging-kafka</module>
         <module>shenyu-spring-boot-starter-plugin-logging-elasticsearch</module>
         <module>shenyu-spring-boot-starter-plugin-cache</module>
         <module>shenyu-spring-boot-starter-plugin-mock</module>
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/pom.xml
similarity index 51%
copy from shenyu-plugin/shenyu-plugin-logging/pom.xml
copy to shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/pom.xml
index beb8864ef..76840602e 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/pom.xml
@@ -16,21 +16,31 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.shenyu</groupId>
-        <artifactId>shenyu-plugin</artifactId>
+        <artifactId>shenyu-spring-boot-starter-plugin</artifactId>
         <version>2.5.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>shenyu-plugin-logging</artifactId>
-    <packaging>pom</packaging>
 
-    <modules>
-        <module>shenyu-plugin-logging-console</module>
-        <module>shenyu-plugin-logging-rocketmq</module>
-        <module>shenyu-plugin-logging-elasticsearch</module>
-    </modules>
+    <artifactId>shenyu-spring-boot-starter-plugin-logging-kafka</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-plugin-logging-kafka</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfiguration.java b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfiguration.java
new file mode 100644
index 000000000..6c5d12b17
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.springboot.starter.plugin.logging.kafka;
+
+import org.apache.shenyu.plugin.api.ShenyuPlugin;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin;
+import org.apache.shenyu.plugin.logging.kafka.handler.LoggingKafkaPluginDataHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * config logging Kafka plugin.
+ */
+@Configuration
+@ConditionalOnProperty(value = {"shenyu.plugins.logging-kafka.enabled"}, havingValue = "true", matchIfMissing = true)
+public class LoggingKafkaPluginConfiguration {
+
+    /**
+     * logging Kafka plugin data handler.
+     * @return logging Kafka PluginDataHandler
+     */
+    @Bean
+    public PluginDataHandler loggingKafkaPluginDataHandler() {
+        return new LoggingKafkaPluginDataHandler();
+    }
+
+    /**
+     * Logging Kafka plugin.
+     * @return LoggingKafkaPlugin
+     */
+    @Bean
+    public ShenyuPlugin loggingKafkaPlugin() {
+        return new LoggingKafkaPlugin();
+    }
+
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.factories b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..3c94c46ab
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.apache.shenyu.springboot.starter.plugin.logging.kafka.LoggingKafkaPluginConfiguration
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.provides b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.provides
new file mode 100644
index 000000000..8bf0b1e58
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.provides
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+provides: shenyu-spring-boot-starter-plugin-logging-kafka
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/test/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfigurationTest.java b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/test/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfigurationTest.java
new file mode 100644
index 000000000..75e005ad2
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/test/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfigurationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.springboot.starter.plugin.logging.kafka;
+
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.ShenyuPlugin;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.context.annotation.Configuration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * logging kafka plugin config test.
+ */
+@Configuration
+@EnableConfigurationProperties
+public class LoggingKafkaPluginConfigurationTest {
+
+    private ApplicationContextRunner applicationContextRunner;
+
+    @BeforeEach
+    public void before() {
+        applicationContextRunner = new ApplicationContextRunner()
+                .withConfiguration(AutoConfigurations.of(LoggingKafkaPluginConfiguration.class))
+                .withBean(LoggingKafkaPluginConfigurationTest.class);
+    }
+
+    @Test
+    public void testLoggingKafkaPlugin() {
+        applicationContextRunner
+                .withPropertyValues(
+                        "debug=true",
+                        "shenyu.logging.kafka.enabled=true"
+                )
+                .run(context -> {
+                    PluginDataHandler pluginDataHandler = context.getBean("loggingKafkaPluginDataHandler", PluginDataHandler.class);
+                    assertNotNull(pluginDataHandler);
+
+                    ShenyuPlugin plugin = context.getBean("loggingKafkaPlugin", ShenyuPlugin.class);
+                    assertNotNull(plugin);
+                    assertThat(plugin.named()).isEqualTo(PluginEnum.LOGGING_KAFKA.getName());
+                });
+    }
+}