You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by zh...@apache.org on 2022/06/26 05:19:04 UTC
[incubator-shenyu] branch master updated: [ISSUE #2917] Shenyu add logging-kafka plugin (#3609)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 5b42774ce [ISSUE #2917] Shenyu add logging-kafka plugin (#3609)
5b42774ce is described below
commit 5b42774ce3b3d59bbd662eda46679040f6965754
Author: qifanyyy <45...@users.noreply.github.com>
AuthorDate: Sun Jun 26 01:18:57 2022 -0400
[ISSUE #2917] Shenyu add logging-kafka plugin (#3609)
* [GSoC 2022] Shenyu add logging-kafka plugin #2917
* Update pom.xml
* Style change
* Update pluginenum style change
* Optimize the version kafka-client to <kafka-clients.version>
---
pom.xml | 1 +
shenyu-bootstrap/pom.xml | 9 +
.../org/apache/shenyu/common/enums/PluginEnum.java | 5 +
.../src/main/release-docs/LICENSE | 1 +
shenyu-plugin/shenyu-plugin-logging/pom.xml | 1 +
.../shenyu-plugin-logging-kafka/pom.xml | 54 +++
.../plugin/logging/kafka/AbstractLogCollector.java | 124 +++++++
.../plugin/logging/kafka/DefaultLogCollector.java | 42 +++
.../shenyu/plugin/logging/kafka/LogCollector.java | 38 ++
.../plugin/logging/kafka/LogConsumeClient.java | 35 ++
.../plugin/logging/kafka/LoggingKafkaPlugin.java | 93 +++++
.../plugin/logging/kafka/body/BodyWriter.java | 106 ++++++
.../kafka/body/LoggingServerHttpRequest.java | 64 ++++
.../kafka/body/LoggingServerHttpResponse.java | 253 +++++++++++++
.../logging/kafka/config/LogCollectConfig.java | 269 ++++++++++++++
.../logging/kafka/constant/LoggingConstant.java | 32 ++
.../logging/kafka/entity/LZ4CompressData.java | 69 ++++
.../logging/kafka/entity/ShenyuRequestLog.java | 407 +++++++++++++++++++++
.../handler/LoggingKafkaPluginDataHandler.java | 154 ++++++++
.../logging/kafka/kafka/KafkaLogCollectClient.java | 147 ++++++++
.../plugin/logging/kafka/sampler/CountSampler.java | 114 ++++++
.../plugin/logging/kafka/sampler/Sampler.java | 38 ++
.../logging/kafka/utils/LogCollectConfigUtils.java | 172 +++++++++
.../logging/kafka/utils/LogCollectUtils.java | 61 +++
.../logging/kafka/DefaultLogCollectorTest.java | 58 +++
.../logging/kafka/LoggingKafkaPluginTest.java | 97 +++++
.../plugin/logging/kafka/body/BodyWriterTest.java | 70 ++++
.../kafka/body/LoggingServerHttpResponseTest.java | 162 ++++++++
.../logging/kafka/config/LogCollectConfigTest.java | 92 +++++
.../logging/kafka/entity/LZ4CompressDataTest.java | 54 +++
.../logging/kafka/entity/ShenyuRequestLogTest.java | 85 +++++
.../handler/LoggingKafkaPluginDataHandlerTest.java | 118 ++++++
.../kafka/kafka/KafkaLogCollectClientTest.java | 78 ++++
.../logging/kafka/sampler/CountSamplerTest.java | 91 +++++
.../kafka/utils/LogCollectConfigUtilsTest.java | 127 +++++++
.../logging/kafka/utils/LogCollectUtilsTest.java | 65 ++++
.../shenyu-spring-boot-starter-plugin/pom.xml | 1 +
.../pom.xml | 32 +-
.../kafka/LoggingKafkaPluginConfiguration.java | 53 +++
.../src/main/resources/META-INF/spring.factories | 19 +
.../src/main/resources/META-INF/spring.provides | 18 +
.../kafka/LoggingKafkaPluginConfigurationTest.java | 65 ++++
42 files changed, 3563 insertions(+), 11 deletions(-)
diff --git a/pom.xml b/pom.xml
index dc085c110..1f48df31f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,7 @@
<jackson-databind.version>2.12.3</jackson-databind.version>
<jakarta.json-api.version>2.0.1</jakarta.json-api.version>
<elasticsearch-rest-client.version>8.2.3</elasticsearch-rest-client.version>
+ <kafka-clients.version>3.2.0</kafka-clients.version>
<!--maven plugin version-->
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.7</jacoco-maven-plugin.version>
diff --git a/shenyu-bootstrap/pom.xml b/shenyu-bootstrap/pom.xml
index eec17d6e2..b4885b76d 100644
--- a/shenyu-bootstrap/pom.xml
+++ b/shenyu-bootstrap/pom.xml
@@ -459,6 +459,14 @@
</dependency>
<!--shenyu logging-rocketmq plugin end-->
+ <!--shenyu logging-kafka plugin start-->
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-spring-boot-starter-plugin-logging-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!--shenyu logging-kafka plugin end-->
+
<!--shenyu logging-elasticsearch plugin start-->
<dependency>
<groupId>org.apache.shenyu</groupId>
@@ -466,6 +474,7 @@
<version>${project.version}</version>
</dependency>
<!--shenyu logging-elasticsearch plugin end-->
+
</dependencies>
<profiles>
<profile>
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
index ba618caee..afc218338 100644
--- a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
@@ -137,6 +137,11 @@ public enum PluginEnum {
*/
LOGGING_ROCKETMQ(170, 0, "loggingRocketMQ"),
+ /**
+ * Logging RocketMQ plugin enum.
+ */
+ LOGGING_KAFKA(180, 0, "loggingKafka"),
+
/**
* Logging ElasticSearch plugin enum.
*/
diff --git a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
index 1ed493d21..6408e08f4 100644
--- a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
+++ b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
@@ -392,6 +392,7 @@ The text of each license is the standard Apache 2.0 license.
tomcat-annotations-api 9.0.29: https://tomcat.apache.org/, Apache 2.0
tomcat-embed-core 9.0.29: https://tomcat.apache.org/, Apache 2.0
jackson-jr-objects 2.10.1: http://wiki.fasterxml.com/JacksonHome, Apache 2.0
+ kafka-clients 3.2.0: https://kafka.apache.org/, Apache 2.0
elasticsearch-java 8.2.3: https://github.com/elastic/elasticsearch-java, Apache 2.0
elasticsearch-rest-client 8.2.3: https://github.com/elastic/elasticsearch, Apache 2.0
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-plugin/shenyu-plugin-logging/pom.xml
index beb8864ef..767ca0f17 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-plugin/shenyu-plugin-logging/pom.xml
@@ -31,6 +31,7 @@
<modules>
<module>shenyu-plugin-logging-console</module>
<module>shenyu-plugin-logging-rocketmq</module>
+ <module>shenyu-plugin-logging-kafka</module>
<module>shenyu-plugin-logging-elasticsearch</module>
</modules>
</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/pom.xml b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/pom.xml
new file mode 100644
index 000000000..52f3c64d6
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin-logging</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>shenyu-plugin-logging-kafka</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-clients.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>${lz4-java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/AbstractLogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/AbstractLogCollector.java
new file mode 100644
index 000000000..c00e177f3
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/AbstractLogCollector.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.shenyu.common.concurrent.MemorySafeTaskQueue;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
+import org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor;
+import org.apache.shenyu.common.config.ShenyuConfig;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.utils.ThreadUtils;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * abstract log collector,Contains common methods.
+ */
+public abstract class AbstractLogCollector implements LogCollector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractLogCollector.class);
+
+ private final AtomicBoolean started = new AtomicBoolean(true);
+
+ private final ShenyuConfig config = new ShenyuConfig();
+
+ private int bufferSize;
+
+ private BlockingQueue<ShenyuRequestLog> bufferQueue;
+
+ private long lastPushTime;
+
+ @Override
+ public void start() {
+ bufferSize = LogCollectConfigUtils.getGlobalLogConfig().getBufferQueueSize();
+ bufferQueue = new LinkedBlockingDeque<>(bufferSize);
+ final ShenyuConfig.SharedPool sharedPool = config.getSharedPool();
+ ShenyuThreadPoolExecutor threadExecutor = new ShenyuThreadPoolExecutor(sharedPool.getCorePoolSize(),
+ sharedPool.getMaximumPoolSize(), sharedPool.getKeepAliveTime(), TimeUnit.MILLISECONDS,
+ new MemorySafeTaskQueue<>(Constants.THE_256_MB),
+ ShenyuThreadFactory.create(config.getSharedPool().getPrefix(), true),
+ new ThreadPoolExecutor.AbortPolicy());
+ started.set(true);
+ threadExecutor.execute(this::consume);
+ }
+
+ @Override
+ public void collect(final ShenyuRequestLog log) {
+ if (Objects.isNull(log) || Objects.isNull(getLogConsumeClient())) {
+ return;
+ }
+ if (bufferQueue.size() < bufferSize) {
+ bufferQueue.add(log);
+ }
+ }
+
+ /**
+ * batch and async consume.
+ */
+ private void consume() {
+ while (started.get()) {
+ int diffTimeMSForPush = 100;
+ try {
+ List<ShenyuRequestLog> logs = new ArrayList<>();
+ int size = bufferQueue.size();
+ long time = System.currentTimeMillis();
+ long timeDiffMs = time - lastPushTime;
+ int batchSize = 100;
+ if (size >= batchSize || timeDiffMs > diffTimeMSForPush) {
+ bufferQueue.drainTo(logs, batchSize);
+ LogConsumeClient logCollectClient = getLogConsumeClient();
+ if (logCollectClient != null) {
+ logCollectClient.consume(logs);
+ }
+ lastPushTime = time;
+ } else {
+ ThreadUtils.sleep(TimeUnit.MILLISECONDS, diffTimeMSForPush);
+ }
+ } catch (Exception e) {
+ LOG.error("DefaultLogCollector collect log error", e);
+ ThreadUtils.sleep(TimeUnit.MILLISECONDS, diffTimeMSForPush);
+ }
+ }
+ }
+
+ /**
+ * get log consume client.
+ *
+ * @return log consume client
+ */
+ protected abstract LogConsumeClient getLogConsumeClient();
+
+ @Override
+ public void close() throws Exception {
+ started.set(false);
+ LogConsumeClient logCollectClient = getLogConsumeClient();
+ if (logCollectClient != null) {
+ logCollectClient.close();
+ }
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollector.java
new file mode 100644
index 000000000..29d3d4cab
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import org.apache.shenyu.plugin.logging.kafka.handler.LoggingKafkaPluginDataHandler;
+
+/**
+ * default log collector,depend a LogConsumeClient for consume logs.
+ */
+public class DefaultLogCollector extends AbstractLogCollector {
+
+ private static final LogCollector INSTANCE = new DefaultLogCollector();
+
+ /**
+ * get LogCollector instance.
+ *
+ * @return LogCollector instance
+ */
+ public static LogCollector getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ protected LogConsumeClient getLogConsumeClient() {
+ return LoggingKafkaPluginDataHandler.getKafkaLogCollectClient();
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogCollector.java
new file mode 100644
index 000000000..f4397f528
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogCollector.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+
+/**
+ * Collect logs and put into buffer queue.
+ */
+public interface LogCollector extends AutoCloseable {
+
+ /**
+ * start log collector.
+ */
+ void start();
+
+ /**
+ * collect log.
+ *
+ * @param log access log
+ */
+ void collect(ShenyuRequestLog log);
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogConsumeClient.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogConsumeClient.java
new file mode 100644
index 000000000..5d0abc0b7
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LogConsumeClient.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.util.List;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+
+/**
+ * Used to collect logs, which can be stored in remote or local files or databases, or others.
+ */
+public interface LogConsumeClient extends AutoCloseable {
+
+ /**
+ * collect logs.
+ *
+ * @param logs list of log
+ * @throws Exception produce exception
+ */
+ void consume(List<ShenyuRequestLog> logs) throws Exception;
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
new file mode 100644
index 000000000..4c3e4ca95
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPlugin.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
+import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.apache.shenyu.plugin.logging.kafka.body.LoggingServerHttpRequest;
+import org.apache.shenyu.plugin.logging.kafka.body.LoggingServerHttpResponse;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+
+import static org.apache.shenyu.common.enums.PluginEnum.LOGGING_KAFKA;
+
+/**
+ * Integrated kafka collect log.
+ */
+public class LoggingKafkaPlugin extends AbstractShenyuPlugin {
+
+ private static final String USER_AGENT = "User-Agent";
+
+ private static final String HOST = "Host";
+
+ @Override
+ protected Mono<Void> doExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain,
+ final SelectorData selector, final RuleData rule) {
+ ServerHttpRequest request = exchange.getRequest();
+ // control sampling
+ if (!LogCollectConfigUtils.isSampled(exchange.getRequest())) {
+ return chain.execute(exchange);
+ }
+
+ ShenyuRequestLog requestInfo = new ShenyuRequestLog();
+ requestInfo.setRequestUri(request.getURI().toString());
+ requestInfo.setMethod(request.getMethodValue());
+ requestInfo.setRequestHeader(LogCollectUtils.getHeaders(request.getHeaders()));
+ requestInfo.setQueryParams(request.getURI().getQuery());
+ requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
+ requestInfo.setUserAgent(request.getHeaders().getFirst(USER_AGENT));
+ requestInfo.setHost(request.getHeaders().getFirst(HOST));
+ requestInfo.setPath(request.getURI().getPath());
+
+ LoggingServerHttpRequest loggingServerHttpRequest = new LoggingServerHttpRequest(request, requestInfo);
+ LoggingServerHttpResponse loggingServerHttpResponse = new LoggingServerHttpResponse(exchange.getResponse(),
+ requestInfo, DefaultLogCollector.getInstance());
+ ServerWebExchange webExchange = exchange.mutate().request(loggingServerHttpRequest)
+ .response(loggingServerHttpResponse).build();
+ loggingServerHttpResponse.setExchange(webExchange);
+
+ return chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ }
+
+ /**
+ * get plugin order.
+ *
+ * @return order
+ */
+ @Override
+ public int getOrder() {
+ return LOGGING_KAFKA.getCode();
+ }
+
+ /**
+ * get plugin name.
+ *
+ * @return plugin name
+ */
+ @Override
+ public String named() {
+ return LOGGING_KAFKA.getName();
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriter.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriter.java
new file mode 100644
index 000000000..e0660038f
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * bodyWriter is used to read Body.
+ */
+public class BodyWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BodyWriter.class);
+
+ private final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+ private final WritableByteChannel channel = Channels.newChannel(stream);
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+
+ /**
+ * write ByteBuffer.
+ *
+ * @param buffer byte buffer
+ */
+ public void write(final ByteBuffer buffer) {
+ if (!isClosed.get()) {
+ try {
+ channel.write(buffer);
+ } catch (IOException e) {
+ isClosed.compareAndSet(false, true);
+ LOG.error("write buffer Failed.", e);
+ }
+ }
+ }
+
+ /**
+ * judge stream is empty.
+ *
+ * @return true: stream is empty
+ */
+ boolean isEmpty() {
+ return stream.size() == 0;
+ }
+
+ /**
+ * get stream size.
+ *
+ * @return size of stream
+ */
+ public int size() {
+ return stream.size();
+ }
+
+ /**
+ * output stream value.
+ *
+ * @return string of stream
+ */
+ public String output() {
+ if (isEmpty()) {
+ return "";
+ }
+ try {
+ isClosed.compareAndSet(false, true);
+ return new String(stream.toByteArray(), StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ LOG.error("Write failed: ", e);
+ return "Write failed: " + e.getMessage();
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ LOG.error("Close stream error: ", e);
+ }
+ try {
+ channel.close();
+ } catch (IOException e) {
+ LOG.error("Close channel error: ", e);
+ }
+ }
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpRequest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpRequest.java
new file mode 100644
index 000000000..a19f02125
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
+import reactor.core.publisher.Flux;
+import reactor.util.annotation.NonNull;
+
+/**
+ * decorate ServerHttpRequest for read body.
+ */
+public class LoggingServerHttpRequest extends ServerHttpRequestDecorator {
+
+ private final ShenyuRequestLog logInfo;
+
+ public LoggingServerHttpRequest(final ServerHttpRequest delegate, final ShenyuRequestLog logInfo) {
+ super(delegate);
+ this.logInfo = logInfo;
+ }
+
+ /**
+ * get request body.
+ *
+ * @return Flux
+ */
+ @Override
+ @NonNull
+ public Flux<DataBuffer> getBody() {
+ BodyWriter writer = new BodyWriter();
+ return super.getBody().doOnNext(dataBuffer -> {
+ if (LogCollectUtils.isNotBinaryType(getHeaders())) {
+ writer.write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
+ }
+ }).doFinally(signal -> {
+ int size = writer.size();
+ String body = writer.output();
+ boolean requestBodyTooLarge = LogCollectConfigUtils.isRequestBodyTooLarge(size);
+ if (size == 0 || requestBodyTooLarge) {
+ return;
+ }
+ logInfo.setRequestBody(body);
+ });
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponse.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponse.java
new file mode 100644
index 000000000..1cbe9a085
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponse.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.common.utils.DateUtils;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.result.ShenyuResult;
+import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
+import org.apache.shenyu.plugin.logging.kafka.DefaultLogCollector;
+import org.apache.shenyu.plugin.logging.kafka.LogCollector;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
+import org.springframework.web.server.ResponseStatusException;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.annotation.NonNull;
+
+/**
+ * decorate ServerHttpResponse for read body.
+ */
+public class LoggingServerHttpResponse extends ServerHttpResponseDecorator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingServerHttpResponse.class);
+
+ private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+ private final ShenyuRequestLog logInfo;
+
+ private final LogCollector logCollector;
+
+ private ServerWebExchange exchange;
+
+ /**
+ * Constructor LoggingServerHttpResponse.
+ *
+ * @param delegate delegate ServerHttpResponse
+ * @param logInfo access log
+ * @param logCollector LogCollector instance
+ */
+ public LoggingServerHttpResponse(final ServerHttpResponse delegate, final ShenyuRequestLog logInfo,
+ final LogCollector logCollector) {
+ super(delegate);
+ this.logInfo = logInfo;
+ this.logCollector = logCollector;
+ }
+
+ /**
+ * set relevant ServerWebExchange.
+ *
+ * @param exchange ServerWebExchange
+ */
+ public void setExchange(final ServerWebExchange exchange) {
+ this.exchange = exchange;
+ }
+
+ /**
+ * write with a publisher.
+ *
+ * @param body response body
+ * @return Mono
+ */
+ @Override
+ @NonNull
+ public Mono<Void> writeWith(@NonNull final Publisher<? extends DataBuffer> body) {
+ return super.writeWith(appendResponse(body));
+ }
+
+ /**
+ * append response.
+ *
+ * @param body publisher
+ * @return wrap Flux
+ */
+ @NonNull
+ private Flux<? extends DataBuffer> appendResponse(final Publisher<? extends DataBuffer> body) {
+ ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+ assert shenyuContext != null;
+ if (getStatusCode() != null) {
+ logInfo.setStatus(getStatusCode().value());
+ }
+ logInfo.setResponseHeader(LogCollectUtils.getHeaders(getHeaders()));
+ BodyWriter writer = new BodyWriter();
+ logInfo.setTraceId(getTraceId());
+ return Flux.from(body).doOnNext(buffer -> {
+ if (LogCollectUtils.isNotBinaryType(getHeaders())) {
+ writer.write(buffer.asByteBuffer().asReadOnlyBuffer());
+ }
+ }).doFinally(signal -> logResponse(shenyuContext, writer));
+ }
+
+ /**
+ * record response log.
+ *
+ * @param shenyuContext request context
+ * @param writer bodyWriter
+ */
+ private void logResponse(final ShenyuContext shenyuContext, final BodyWriter writer) {
+ if (StringUtils.isNotBlank(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH))) {
+ String size = StringUtils.defaultIfEmpty(getHeaders().getFirst(HttpHeaders.CONTENT_LENGTH), "0");
+ logInfo.setResponseContentLength(Integer.parseInt(size));
+ } else {
+ logInfo.setResponseContentLength(writer.size());
+ }
+ logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
+ logInfo.setModule(shenyuContext.getModule());
+ long costTime = DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(), LocalDateTime.now());
+ logInfo.setUpstreamResponseTime(costTime);
+ logInfo.setMethod(shenyuContext.getMethod());
+ logInfo.setRpcType(shenyuContext.getRpcType());
+ if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
+ logInfo.setUpstreamIp(getUpstreamIp());
+ }
+ int size = writer.size();
+ String body = writer.output();
+ if (size > 0 && !LogCollectConfigUtils.isResponseBodyTooLarge(size)) {
+ logInfo.setResponseBody(body);
+ }
+ // collect log
+ if (logCollector != null) {
+ logCollector.collect(logInfo);
+ }
+ }
+
+ /**
+ * get upstream ip.
+ *
+ * @return upstream ip
+ */
+ private String getUpstreamIp() {
+ ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+ assert shenyuContext != null;
+ if (RpcTypeEnum.HTTP.getName().equals(shenyuContext.getRpcType())) {
+ URI uri = exchange.getAttribute(Constants.HTTP_URI);
+ if (uri != null) {
+ return uri.getHost();
+ } else {
+ return getUpstreamIpFromHttpDomain();
+ }
+ } else {
+ String domain = (String) exchange.getAttributes().get(Constants.HTTP_DOMAIN);
+ if (StringUtils.isNotBlank(domain)) {
+ return getUpstreamIpFromHttpDomain();
+ }
+ // The current context is difficult to obtain the upstream IP of grpc and Dubbo. need change plugin code.
+ }
+ return "";
+ }
+
+ /**
+ * Encourage developers to provide plugins to upstream like SkyWalking, ZipKin and OpenTelemetry
+ * to implement the tracing features. These plug-ins can set a traceId to the context.
+ *
+ * @return traceId
+ */
+ private String getTraceId() {
+ return (String) exchange.getAttributes().get(LoggingConstant.SHENYU_AGENT_TRACE_ID);
+ }
+
+ /**
+ * collect access error.
+ *
+ * @param throwable Exception occurred。
+ */
+ public void logError(final Throwable throwable) {
+ HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
+ if (throwable instanceof ResponseStatusException) {
+ httpStatus = ((ResponseStatusException) throwable).getStatus();
+ }
+ logInfo.setStatus(httpStatus.value());
+ logInfo.setTraceId(getTraceId());
+ // Do not collect stack
+ Object result = ShenyuResultWrap.error(exchange, httpStatus.value(),
+ httpStatus.getReasonPhrase(), throwable.getMessage());
+ final ShenyuResult<?> shenyuResult = ShenyuResultWrap.shenyuResult();
+ Object resultData = shenyuResult.format(exchange, result);
+ final Object responseData = shenyuResult.result(exchange, resultData);
+ assert null != responseData;
+ final byte[] bytes = (responseData instanceof byte[])
+ ? (byte[]) responseData
+ : responseData.toString().getBytes(StandardCharsets.UTF_8);
+ logInfo.setResponseContentLength(bytes.length);
+ ShenyuContext shenyuContext = exchange.getAttribute(Constants.CONTEXT);
+ assert shenyuContext != null;
+ logInfo.setTimeLocal(shenyuContext.getStartDateTime().format(DATE_TIME_FORMATTER));
+ logInfo.setModule(shenyuContext.getModule());
+ long costTime = DateUtils.acquireMillisBetween(shenyuContext.getStartDateTime(), LocalDateTime.now());
+ logInfo.setUpstreamResponseTime(costTime);
+ logInfo.setResponseHeader(LogCollectUtils.getHeaders(exchange.getResponse().getHeaders()));
+ logInfo.setRpcType(shenyuContext.getRpcType());
+ logInfo.setMethod(shenyuContext.getMethod());
+ if (StringUtils.isNotBlank(shenyuContext.getRpcType())) {
+ logInfo.setUpstreamIp(getUpstreamIp());
+ }
+
+ int size = bytes.length;
+ String body = new String(bytes, StandardCharsets.UTF_8);
+ if (size > 0 && !LogCollectConfigUtils.isResponseBodyTooLarge(size)) {
+ logInfo.setResponseBody(body);
+ }
+ // collect log
+ if (DefaultLogCollector.getInstance() != null) {
+ DefaultLogCollector.getInstance().collect(logInfo);
+ }
+ }
+
+ private String getUpstreamIpFromHttpDomain() {
+ String domain = (String) exchange.getAttributes().get(Constants.HTTP_DOMAIN);
+ try {
+ if (StringUtils.isNotBlank(domain)) {
+ URL url = new URL(domain);
+ return url.getHost();
+ }
+ } catch (Exception e) {
+ LOG.error("get upstream ip error");
+ }
+ return "";
+ }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfig.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfig.java
new file mode 100644
index 000000000..12d1b3350
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfig.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.config;
+
+/**
+ * log collect config, include kafka config.
+ * Topic and nameserver must be included, and others are optional.
+ * We should operate the configuration through admin instead of the configuration file.
+ */
+public class LogCollectConfig {
+
+ private GlobalLogConfig globalLogConfig;
+
+ /**
+ * get global log config.
+ *
+ * @return global log config
+ */
+ public GlobalLogConfig getGlobalLogConfig() {
+ return globalLogConfig;
+ }
+
+ /**
+ * set global log config.
+ *
+ * @param globalLogConfig global log config.
+ */
+ public void setGlobalLogConfig(final GlobalLogConfig globalLogConfig) {
+ this.globalLogConfig = globalLogConfig;
+ }
+
+ /**
+ * global log config.
+ */
+ public static class GlobalLogConfig {
+ private String topic;
+
+ private String namesrvAddr;
+
+ private String producerGroup;
+
+ private String sampleRate = "1";
+
+ private String compressAlg;
+
+ /**
+ * default 512KB.
+ */
+ private int maxResponseBody = 524288;
+
+ /**
+ * default 512kb.
+ */
+ private int maxRequestBody = 524288;
+
+ private int bufferQueueSize = 50000;
+
+ /**
+ * get sample rate.
+ *
+ * @return sample
+ */
+ public String getSampleRate() {
+ return sampleRate;
+ }
+
+ /**
+ * set sample rate.
+ *
+ * @param sampleRate rate
+ */
+ public void setSampleRate(final String sampleRate) {
+ this.sampleRate = sampleRate;
+ }
+
+ /**
+ * whether compress.
+ *
+ * @return compress or not
+ */
+ public String getCompressAlg() {
+ return compressAlg;
+ }
+
+ /**
+ * set compress.
+ *
+ * @param compressAlg compress alg.
+ */
+ public void setCompressAlg(final String compressAlg) {
+ this.compressAlg = compressAlg;
+ }
+
+ /**
+ * get message queue topic.
+ *
+ * @return message queue topic
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * topic,used for message queue.
+ *
+ * @param topic mq topic
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * get max response body.
+ *
+ * @return get max response body
+ */
+ public int getMaxResponseBody() {
+ return maxResponseBody;
+ }
+
+ /**
+ * set max response body.
+ *
+ * @param maxResponseBody max response body
+ */
+ public void setMaxResponseBody(final int maxResponseBody) {
+ this.maxResponseBody = maxResponseBody;
+ }
+
+ /**
+ * get max request body.
+ *
+ * @return max request body
+ */
+ public int getMaxRequestBody() {
+ return maxRequestBody;
+ }
+
+ /**
+ * set max request body.
+ *
+ * @param maxRequestBody max request body
+ */
+ public void setMaxRequestBody(final int maxRequestBody) {
+ this.maxRequestBody = maxRequestBody;
+ }
+
+ /**
+ * get buffer queue size.
+ *
+ * @return buffer queue size
+ */
+ public int getBufferQueueSize() {
+ return bufferQueueSize;
+ }
+
+ /**
+ * set buffer queue size.
+ *
+ * @param bufferQueueSize buffer queue size
+ */
+ public void setBufferQueueSize(final int bufferQueueSize) {
+ this.bufferQueueSize = bufferQueueSize;
+ }
+
+ /**
+ * get kafka nameserver address.
+ *
+ * @return kafka nameserver address
+ */
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ /**
+ * set kafka nameserver address.
+ * @param namesrvAddr kafka nameserver address
+ */
+ public void setNamesrvAddr(final String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ /**
+ * get producer group.
+ *
+ * @return producer group
+ */
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ /**
+ * set producer group.
+ *
+ * @param producerGroup producer group
+ */
+ public void setProducerGroup(final String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+ }
+
+ /**
+ * api log config.
+ */
+ public static class LogApiConfig {
+
+ /**
+ * 0 means never sample, 1 means always sample. Minimum probability is 0.01, or 1% of logging
+ */
+ private String sampleRate;
+
+ /**
+ * This topic is useful if you use message queuing to collect logs.
+ */
+ private String topic;
+
+ /**
+ * get sample rate.
+ *
+ * @return sample rate
+ */
+ public String getSampleRate() {
+ return sampleRate;
+ }
+
+ /**
+ * set sample rate.
+ *
+ * @param sampleRate sample rate
+ */
+ public void setSampleRate(final String sampleRate) {
+ this.sampleRate = sampleRate;
+ }
+
+ /**
+ * get mq topic.
+ *
+ * @return mq topic
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * set mq topic.
+ *
+ * @param topic mq topic
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+ }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/constant/LoggingConstant.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/constant/LoggingConstant.java
new file mode 100644
index 000000000..1083aaa81
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/constant/LoggingConstant.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.constant;
+
+/**
+ * Some log related property constants.
+ */
+public final class LoggingConstant {
+
+ public static final String TOPIC = "topic";
+
+ public static final String NAMESERVER_ADDRESS = "namesrvAddr";
+
+ public static final String SHENYU_AGENT_TRACE_ID = "shenyu-agent-trace-id";
+
+}
+
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressData.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressData.java
new file mode 100644
index 000000000..e6c4ee6a8
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+/**
+ * Lz4 compressed data.
+ */
+public class LZ4CompressData {
+
+ private int length;
+
+ private byte[] compressedData;
+
+ public LZ4CompressData(final int length, final byte[] compressedData) {
+ this.length = length;
+ this.compressedData = compressedData;
+ }
+
+ /**
+ * get the exact size of the original input.
+ *
+ * @return the exact size of the original input
+ */
+ public int getLength() {
+ return length;
+ }
+
+ /**
+ * set the exact size of the original input.
+ *
+ * @param length the exact size of the original input.
+ */
+ public void setLength(final int length) {
+ this.length = length;
+ }
+
+ /**
+ * get the compressed data.
+ *
+ * @return the compressed data
+ */
+ public byte[] getCompressedData() {
+ return compressedData;
+ }
+
+ /**
+ * set the compressed data.
+ *
+ * @param compressedData the compressed data
+ */
+ public void setCompressedData(final byte[] compressedData) {
+ this.compressedData = compressedData;
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLog.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLog.java
new file mode 100644
index 000000000..ffd44afaa
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLog.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+/**
+ * shenyu gateway access log.
+ */
+public class ShenyuRequestLog {
+
+ private String clientIp;
+
+ private String timeLocal;
+
+ private String method;
+
+ private String requestHeader;
+
+ private String responseHeader;
+
+ private String queryParams;
+
+ private String requestBody;
+
+ private String requestUri;
+
+ private String responseBody;
+
+ private int responseContentLength;
+
+ private String rpcType;
+
+ private int status;
+
+ private String upstreamIp;
+
+ private long upstreamResponseTime;
+
+ private String userAgent;
+
+ private String host;
+
+ private String module;
+
+ private String traceId;
+
+ /**
+ * path.
+ */
+ private String path;
+
+ /**
+ * get module.
+ *
+ * @return module
+ */
+ public String getModule() {
+ return module;
+ }
+
+ /**
+ * set module.
+ *
+ * @param module module
+ */
+ public void setModule(final String module) {
+ this.module = module;
+ }
+
+ /**
+ * get responseContentLength.
+ *
+ * @return ResponseContentLength
+ */
+ public int getResponseContentLength() {
+ return responseContentLength;
+ }
+
+ /**
+ * set ResponseContentLength.
+ *
+ * @param responseContentLength ResponseContentLength
+ */
+ public void setResponseContentLength(final int responseContentLength) {
+ this.responseContentLength = responseContentLength;
+ }
+
+ /**
+ * get userAgent.
+ *
+ * @return userAgent
+ */
+ public String getUserAgent() {
+ return userAgent;
+ }
+
+ /**
+ * set userAgent.
+ *
+ * @param userAgent userAgent
+ */
+ public void setUserAgent(final String userAgent) {
+ this.userAgent = userAgent;
+ }
+
+ /**
+ * get host.
+ *
+ * @return host
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * set host.
+ *
+ * @param host host
+ */
+ public void setHost(final String host) {
+ this.host = host;
+ }
+
+ /**
+ * get clientIp.
+ *
+ * @return clientIp
+ */
+ public String getClientIp() {
+ return clientIp;
+ }
+
+ /**
+ * set clientIp.
+ *
+ * @param clientIp clientIp
+ */
+ public void setClientIp(final String clientIp) {
+ this.clientIp = clientIp;
+ }
+
+ /**
+ * get timeLocal.
+ *
+ * @return timeLocal
+ */
+ public String getTimeLocal() {
+ return timeLocal;
+ }
+
+ /**
+ * set timeLocal.
+ *
+ * @param timeLocal timeLocal
+ */
+ public void setTimeLocal(final String timeLocal) {
+ this.timeLocal = timeLocal;
+ }
+
+ /**
+ * get method.
+ *
+ * @return method
+ */
+ public String getMethod() {
+ return method;
+ }
+
+ /**
+ * set method.
+ *
+ * @param method method
+ */
+ public void setMethod(final String method) {
+ this.method = method;
+ }
+
+ /**
+ * get requestHeader.
+ *
+ * @return requestHeader
+ */
+ public String getRequestHeader() {
+ return requestHeader;
+ }
+
+ /**
+ * set requestHeader.
+ *
+ * @param requestHeader requestHeader
+ */
+ public void setRequestHeader(final String requestHeader) {
+ this.requestHeader = requestHeader;
+ }
+
+ /**
+ * get responseHeader.
+ *
+ * @return responseHeader
+ */
+ public String getResponseHeader() {
+ return responseHeader;
+ }
+
+ /**
+ * set responseHeader.
+ *
+ * @param responseHeader responseHeader
+ */
+ public void setResponseHeader(final String responseHeader) {
+ this.responseHeader = responseHeader;
+ }
+
+ /**
+ * get queryParams.
+ *
+ * @return queryParams
+ */
+ public String getQueryParams() {
+ return queryParams;
+ }
+
+ /**
+ * set queryParams.
+ *
+ * @param queryParams queryParams
+ */
+ public void setQueryParams(final String queryParams) {
+ this.queryParams = queryParams;
+ }
+
+ /**
+ * get requestBody.
+ *
+ * @return requestBody
+ */
+ public String getRequestBody() {
+ return requestBody;
+ }
+
+ /**
+ * set requestBody.
+ *
+ * @param requestBody requestBody
+ */
+ public void setRequestBody(final String requestBody) {
+ this.requestBody = requestBody;
+ }
+
+ /**
+ * get requestUri.
+ *
+ * @return requestUri
+ */
+ public String getRequestUri() {
+ return requestUri;
+ }
+
+ /**
+ * set requestUri.
+ *
+ * @param requestUri requestUri
+ */
+ public void setRequestUri(final String requestUri) {
+ this.requestUri = requestUri;
+ }
+
+ /**
+ * get responseBody.
+ *
+ * @return responseBody
+ */
+ public String getResponseBody() {
+ return responseBody;
+ }
+
+ /**
+ * set responseBody.
+ *
+ * @param responseBody responseBody
+ */
+ public void setResponseBody(final String responseBody) {
+ this.responseBody = responseBody;
+ }
+
+ /**
+ * get rpcType.
+ *
+ * @return rpcType
+ */
+ public String getRpcType() {
+ return rpcType;
+ }
+
+ /**
+ * set rpcType.
+ *
+ * @param rpcType rpcType
+ */
+ public void setRpcType(final String rpcType) {
+ this.rpcType = rpcType;
+ }
+
+ /**
+ * set status.
+ *
+ * @return status
+ */
+ public int getStatus() {
+ return status;
+ }
+
+ /**
+ * set status.
+ *
+ * @param status status
+ */
+ public void setStatus(final int status) {
+ this.status = status;
+ }
+
+ /**
+ * get upstreamIp.
+ *
+ * @return upstreamIp
+ */
+ public String getUpstreamIp() {
+ return upstreamIp;
+ }
+
+ /**
+ * set upstreamIp.
+ *
+ * @param upstreamIp upstreamIp
+ */
+ public void setUpstreamIp(final String upstreamIp) {
+ this.upstreamIp = upstreamIp;
+ }
+
+ /**
+ * get upstreamResponseTime.
+ *
+ * @return upstreamResponseTime
+ */
+ public long getUpstreamResponseTime() {
+ return upstreamResponseTime;
+ }
+
+ /**
+ * set UpstreamResponseTime.
+ *
+ * @param upstreamResponseTime upstreamResponseTime
+ */
+ public void setUpstreamResponseTime(final long upstreamResponseTime) {
+ this.upstreamResponseTime = upstreamResponseTime;
+ }
+
+ /**
+ * get traceId.
+ *
+ * @return traceId
+ */
+ public String getTraceId() {
+ return traceId;
+ }
+
+ /**
+ * set traceId.
+ *
+ * @param traceId tracing id
+ */
+ public void setTraceId(final String traceId) {
+ this.traceId = traceId;
+ }
+
+ /**
+ * get request path.
+ *
+ * @return request path
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * request path.
+ *
+ * @param path request path
+ */
+ public void setPath(final String path) {
+ this.path = path;
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
new file mode 100644
index 000000000..49b9aa84e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.handler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.common.enums.SelectorTypeEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.kafka.DefaultLogCollector;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.kafka.KafkaLogCollectClient;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The type logging kafka plugin data handler.
+ */
+public class LoggingKafkaPluginDataHandler implements PluginDataHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingKafkaPluginDataHandler.class);
+
+ private static final KafkaLogCollectClient KAFKA_LOG_COLLECT_CLIENT = new KafkaLogCollectClient();
+
+ private static final String EMPTY_JSON = "{}";
+
+ private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP = new ConcurrentHashMap<>();
+
+ private static final Map<String, LogCollectConfig.LogApiConfig> SELECT_API_CONFIG_MAP = new ConcurrentHashMap<>();
+
+ /**
+ * get kafka log collect client.
+ *
+ * @return kafka log collect client.
+ */
+ public static KafkaLogCollectClient getKafkaLogCollectClient() {
+ return KAFKA_LOG_COLLECT_CLIENT;
+ }
+
+ /**
+ * get selectId uriList map.
+ *
+ * @return selectId uriList map
+ */
+ public static Map<String, List<String>> getSelectIdUriListMap() {
+ return SELECT_ID_URI_LIST_MAP;
+ }
+
+ /**
+ * get select api config map.
+ *
+ * @return select api config map
+ */
+ public static Map<String, LogCollectConfig.LogApiConfig> getSelectApiConfigMap() {
+ return SELECT_API_CONFIG_MAP;
+ }
+
+ /**
+ * start or close kafka client.
+ */
+ @Override
+ public void handlerPlugin(final PluginData pluginData) {
+ LOG.info("handler loggingKafka Plugin data:{}", GsonUtils.getGson().toJson(pluginData));
+ if (pluginData.getEnabled()) {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
+ LogCollectConfig.GlobalLogConfig.class);
+
+ LogCollectConfigUtils.setGlobalConfig(globalLogConfig);
+ // start kafka producer
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.put("bootstrap.servers", globalLogConfig.getNamesrvAddr());
+ properties.put(LoggingConstant.TOPIC, globalLogConfig.getTopic());
+ properties.put(LoggingConstant.NAMESERVER_ADDRESS, globalLogConfig.getTopic());
+ KAFKA_LOG_COLLECT_CLIENT.initProducer(properties);
+ DefaultLogCollector.getInstance().start();
+ } else {
+ try {
+ DefaultLogCollector.getInstance().close();
+ } catch (Exception e) {
+ LOG.error("close log collector error", e);
+ }
+ }
+ }
+
+ @Override
+ public void handlerSelector(final SelectorData selectorData) {
+ LOG.info("handler loggingKafka selector data:{}", GsonUtils.getGson().toJson(selectorData));
+ String handleJson = selectorData.getHandle();
+ if (StringUtils.isEmpty(handleJson) || EMPTY_JSON.equals(handleJson.trim())) {
+ return;
+ }
+ if (selectorData.getType() != SelectorTypeEnum.CUSTOM_FLOW.getCode()
+ || CollectionUtils.isEmpty(selectorData.getConditionList())) {
+ return;
+ }
+
+ LogCollectConfig.LogApiConfig logApiConfig = GsonUtils.getInstance().fromJson(handleJson,
+ LogCollectConfig.LogApiConfig.class);
+ if (StringUtils.isBlank(logApiConfig.getTopic()) || StringUtils.isBlank(logApiConfig.getSampleRate())) {
+ return;
+ }
+ List<String> uriList = new ArrayList<>();
+ for (ConditionData conditionData : selectorData.getConditionList()) {
+ if ("uri".equals(conditionData.getParamType()) && StringUtils.isNotBlank(conditionData.getParamValue())
+ && ("match".equals(conditionData.getOperator()) || "=".equals(conditionData.getOperator()))) {
+ uriList.add(conditionData.getParamValue().trim());
+ }
+ }
+ SELECT_ID_URI_LIST_MAP.put(selectorData.getId(), uriList);
+ SELECT_API_CONFIG_MAP.put(selectorData.getId(), logApiConfig);
+ }
+
+ @Override
+ public void removeSelector(final SelectorData selectorData) {
+ LOG.info("handler remove loggingKafka selector data:{}", GsonUtils.getGson().toJson(selectorData));
+ SELECT_ID_URI_LIST_MAP.remove(selectorData.getId());
+ SELECT_API_CONFIG_MAP.remove(selectorData.getId());
+ }
+
+ @Override
+ public String pluginNamed() {
+ return PluginEnum.LOGGING_KAFKA.getName();
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClient.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClient.java
new file mode 100644
index 000000000..2464cd1c9
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClient.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.apache.shenyu.plugin.logging.kafka.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.LZ4CompressData;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * queue-based logging collector.
+ */
+public class KafkaLogCollectClient implements LogConsumeClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaLogCollectClient.class);
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private KafkaProducer<String, String> producer;
+
+ private String topic;
+
+ /**
+ * init producer.
+ *
+ * @param props kafka props
+ */
+ public void initProducer(final Properties props) {
+ if (MapUtils.isEmpty(props)) {
+ LOG.error("kafka props is empty. failed init kafka producer");
+ return;
+ }
+ if (isStarted.get()) {
+ close();
+ }
+ String topic = props.getProperty(LoggingConstant.TOPIC);
+ String nameserverAddress = props.getProperty("bootstrap.servers");
+
+ if (StringUtils.isBlank(topic) || StringUtils.isBlank(nameserverAddress)) {
+ LOG.error("init kafkaLogCollectClient error, please check topic or nameserverAddress");
+ return;
+ }
+ this.topic = topic;
+
+ producer = new KafkaProducer<String, String>(props);
+
+ ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, "shenyu-access-logging");
+
+ try {
+ producer.send(record);
+ LOG.info("init kafkaLogCollectClient success");
+ isStarted.set(true);
+ } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+ // We can't recover from these exceptions, so our only option is to close the producer and exit.
+ LOG.error("Init kafkaLogCollectClient error, We can't recover from these exceptions, so our only option is to close the producer and exit", e);
+ producer.close();
+ } catch (KafkaException e) {
+ // For all other exceptions, just abort the transaction and try again.
+ LOG.error(
+ "init kafkaLogCollectClient error,Exceptions other than ProducerFencedException or OutOfOrderSequenceException or AuthorizationException, just abort the transaction and try again", e);
+ }
+ }
+
+ /**
+ * store logs.
+ *
+ * @param logs list of log
+ */
+ @Override
+ public void consume(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ return;
+ }
+ logs.forEach(log -> {
+ String logTopic = StringUtils.defaultIfBlank(LogCollectConfigUtils.getTopic(log.getPath()), topic);
+ try {
+ producer.send(toProducerRecord(logTopic, log));
+ } catch (Exception e) {
+ LOG.error("kafka push logs error", e);
+ }
+ });
+ }
+
+ private ProducerRecord<String, String> toProducerRecord(final String logTopic, final ShenyuRequestLog log) {
+ byte[] bytes = JsonUtils.toJson(log).getBytes(StandardCharsets.UTF_8);
+ String compressAlg = StringUtils.defaultIfBlank(LogCollectConfigUtils.getGlobalLogConfig().getCompressAlg(), "");
+ if ("LZ4".equalsIgnoreCase(compressAlg.trim())) {
+ LZ4CompressData lz4CompressData = new LZ4CompressData(bytes.length, compressedByte(bytes));
+ return new ProducerRecord<>(logTopic, JsonUtils.toJson(lz4CompressData));
+
+ } else {
+ return new ProducerRecord<>(logTopic, JsonUtils.toJson(log));
+ }
+ }
+
+ private byte[] compressedByte(final byte[] srcByte) {
+ LZ4Factory factory = LZ4Factory.fastestInstance();
+ LZ4Compressor compressor = factory.fastCompressor();
+ return compressor.compress(srcByte);
+ }
+
+
+ /**
+ * close producer.
+ */
+ @Override
+ public void close() {
+ if (producer != null && isStarted.get()) {
+ producer.close();
+ isStarted.set(false);
+ }
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSampler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSampler.java
new file mode 100644
index 000000000..3a42cd2ad
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSampler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.sampler;
+
+import java.util.BitSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+/**
+ * used for sample log.
+ * reference resources: http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+ */
+public class CountSampler implements Sampler {
+
+ private final AtomicInteger counter;
+
+ private final BitSet sampleDecisions;
+
+ /**
+ * Fills a bitset with decisions according to the supplied probability.
+ */
+ CountSampler(final float probability) {
+ counter = new AtomicInteger();
+ int percent = (int) (probability * 100.0f);
+ this.sampleDecisions = genRandomBitSet(100, percent);
+ }
+
+ /**
+ * create a sampler instance.
+ *
+ * @param probability probability
+ * @return sampler instance
+ */
+ public static Sampler create(final String probability) {
+ if (StringUtils.isBlank(probability)) {
+ return ALWAYS_SAMPLE;
+ }
+ if ("0".equals(probability)) {
+ return NEVER_SAMPLE;
+ }
+ if ("1".equals(probability) || "1.0".equals(probability) || "1.0.0".equals(probability)) {
+ return ALWAYS_SAMPLE;
+ }
+ float parseProbability = NumberUtils.toFloat(probability, 1);
+ if (parseProbability < 0.01f || parseProbability > 1) {
+ throw new IllegalArgumentException(
+ "probability should be between 0.01 and 1: was " + probability);
+ }
+ return new CountSampler(parseProbability);
+ }
+
+ /**
+ * loops over the pre-canned decisions, resetting to zero when it gets to the end.
+ */
+ @Override
+ public boolean isSampled(final ServerHttpRequest request) {
+ return sampleDecisions.get(mod(counter.getAndIncrement()));
+ }
+
+ /**
+ * Returns a non-negative mod.
+ */
+ private int mod(final int dividend) {
+ int result = dividend % 100;
+ return result >= 0 ? result : 100 + result;
+ }
+
+ /**
+ * gen random bitSet.
+ * reference resources: http://stackoverflow.com/questions/12817946/generate-a-random-bitset-with-n-1s
+ *
+ * @param size bitmap size
+ * @param cardinality cardinality
+ * @return bitSet
+ */
+ private BitSet genRandomBitSet(final int size, final int cardinality) {
+ BitSet result = new BitSet(size);
+ int[] chosen = new int[cardinality];
+ int i;
+ for (i = 0; i < cardinality; ++i) {
+ chosen[i] = i;
+ result.set(i);
+ }
+ Random random = new Random();
+ for (; i < size; ++i) {
+ int j = random.nextInt(i + 1);
+ if (j < cardinality) {
+ result.clear(chosen[j]);
+ result.set(i);
+ chosen[j] = i;
+ }
+ }
+ return result;
+ }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/Sampler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/Sampler.java
new file mode 100644
index 000000000..d724b757c
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/sampler/Sampler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.sampler;
+
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+/**
+ * sampler interface.
+ */
+public interface Sampler {
+
+ Sampler ALWAYS_SAMPLE = request -> true;
+ Sampler NEVER_SAMPLE = request -> false;
+
+ /**
+ * judge a ServerHttpRequest should be sample.
+ *
+ * @param request request
+ * @return whether sample
+ */
+ boolean isSampled(ServerHttpRequest request);
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtils.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtils.java
new file mode 100644
index 000000000..b7f061323
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtils.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig;
+import org.apache.shenyu.plugin.logging.kafka.sampler.CountSampler;
+import org.apache.shenyu.plugin.logging.kafka.sampler.Sampler;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.util.AntPathMatcher;
+
+/**
+ * log collect config Utils.
+ */
+public final class LogCollectConfigUtils {
+
+ private static final AntPathMatcher MATCHER = new AntPathMatcher();
+
+ private static final LogCollectConfig.GlobalLogConfig DEFAULT_GLOBAL_LOG_CONFIG =
+ new LogCollectConfig.GlobalLogConfig();
+
+ private static LogCollectConfig.GlobalLogConfig globalLogConfig;
+
+ private static Map<String, Sampler> apiSamplerMap = new HashMap<>();
+
+ private static Map<String, String> apiTopicMap = new HashMap<>();
+
+ private static Sampler globalSampler = Sampler.ALWAYS_SAMPLE;
+
+ private LogCollectConfigUtils() {
+ }
+
+ /**
+ * set global config.
+ *
+ * @param config global config
+ */
+ public static void setGlobalConfig(final LogCollectConfig.GlobalLogConfig config) {
+ globalLogConfig = config;
+ }
+
+ /**
+ * set api sample.
+ *
+ * @param uriSampleMap api sample map
+ */
+ public static void setSampler(final Map<String, String> uriSampleMap) {
+ Map<String, Sampler> samplerMap = new HashMap<>();
+ uriSampleMap.forEach((path, sampler) -> {
+ if (StringUtils.isBlank(sampler)) {
+ samplerMap.put(path, globalSampler);
+ } else {
+ samplerMap.put(path, CountSampler.create(sampler));
+ }
+ });
+ apiSamplerMap = samplerMap;
+ }
+
+ /**
+ * set api topic map.
+ *
+ * @param uriTopicMap api topic map
+ */
+ public static void setTopic(final Map<String, String> uriTopicMap) {
+ apiTopicMap = uriTopicMap;
+ }
+
+ /**
+ * set global Sampler.
+ *
+ * @param sampler global sampler
+ */
+ public static void setGlobalSampler(final String sampler) {
+ if (StringUtils.isNotBlank(sampler)) {
+ try {
+ globalSampler = CountSampler.create(sampler);
+ } catch (Exception e) {
+ globalSampler = Sampler.ALWAYS_SAMPLE;
+ }
+ }
+ }
+
+ /**
+ * judge whether sample.
+ *
+ * @param request request
+ * @return whether sample
+ */
+ public static boolean isSampled(final ServerHttpRequest request) {
+ String path = request.getURI().getPath();
+ for (Map.Entry<String, Sampler> entry : apiSamplerMap.entrySet()) {
+ String pattern = entry.getKey();
+ if (MATCHER.match(pattern, path)) {
+ return entry.getValue().isSampled(request);
+ }
+ }
+ if (globalSampler != null) {
+ return globalSampler.isSampled(request);
+ }
+ return true;
+ }
+
+ /**
+ * judge whether request body too large.
+ *
+ * @param bodySize body size
+ * @return whether request body too large
+ */
+ public static boolean isRequestBodyTooLarge(final int bodySize) {
+ if (Objects.isNull(globalLogConfig)) {
+ return false;
+ }
+ return bodySize > globalLogConfig.getMaxRequestBody();
+ }
+
+ /**
+ * judge whether response body too large.
+ *
+ * @param bodySize body size.
+ * @return whether response body too large
+ */
+ public static boolean isResponseBodyTooLarge(final int bodySize) {
+ if (Objects.isNull(globalLogConfig)) {
+ return false;
+ }
+ return bodySize > globalLogConfig.getMaxResponseBody();
+ }
+
+ /**
+ * get global log config.
+ *
+ * @return global log config
+ */
+ public static LogCollectConfig.GlobalLogConfig getGlobalLogConfig() {
+ return Optional.ofNullable(globalLogConfig).orElse(DEFAULT_GLOBAL_LOG_CONFIG);
+ }
+
+ /**
+ * get message queue topic.
+ *
+ * @param path request path
+ * @return topic
+ */
+ public static String getTopic(final String path) {
+ for (Map.Entry<String, String> entry : apiTopicMap.entrySet()) {
+ String pattern = entry.getKey();
+ if (MATCHER.match(pattern, path)) {
+ return entry.getValue();
+ }
+ }
+ return "";
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtils.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtils.java
new file mode 100644
index 000000000..317661416
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/main/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.springframework.http.HttpHeaders;
+
+/**
+ * log collect utils.
+ */
+public class LogCollectUtils {
+
+ private static final List<String> BINARY_TYPE_LIST = Arrays.asList("image", "multipart", "cbor",
+ "octet-stream", "pdf", "javascript", "css", "html");
+
+ /**
+ * judge whether is binary type.
+ *
+ * @param headers request or response header
+ * @return whether binary type
+ */
+ public static boolean isNotBinaryType(final HttpHeaders headers) {
+ return Optional.ofNullable(headers).map(HttpHeaders::getContentType)
+ .map(contentType -> !BINARY_TYPE_LIST.contains(contentType.getType())
+ && !BINARY_TYPE_LIST.contains(contentType.getSubtype()))
+ .orElse(true);
+ }
+
+ /**
+ * get request header string.
+ *
+ * @param headers request headers
+ * @return header string
+ */
+ public static String getHeaders(final HttpHeaders headers) {
+ Map<String, String> map = headers.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.join(",", entry.getValue())));
+ return JsonUtils.toJson(map);
+ }
+
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollectorTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollectorTest.java
new file mode 100644
index 000000000..bbe08226a
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/DefaultLogCollectorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.lang.reflect.Field;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.kafka.KafkaLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For DefaultLogCollector.
+ */
+public class DefaultLogCollectorTest {
+
+ private ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+ @BeforeEach
+ public void setUp() {
+ shenyuRequestLog.setClientIp("0.0.0.0");
+ shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+ }
+
+ @Test
+ public void testAbstractLogCollector() throws Exception {
+ DefaultLogCollector.getInstance().start();
+ Field field1 = AbstractLogCollector.class.getDeclaredField("started");
+ field1.setAccessible(true);
+ Assertions.assertEquals(field1.get(DefaultLogCollector.getInstance()).toString(), "true");
+ DefaultLogCollector.getInstance().collect(shenyuRequestLog);
+ DefaultLogCollector.getInstance().close();
+ Field field2 = AbstractLogCollector.class.getDeclaredField("started");
+ field2.setAccessible(true);
+ Assertions.assertEquals(field2.get(DefaultLogCollector.getInstance()).toString(), "false");
+ }
+
+ @Test
+ public void testGetLogConsumeClient() {
+ LogConsumeClient logConsumeClient = new DefaultLogCollector().getLogConsumeClient();
+ Assertions.assertEquals(KafkaLogCollectClient.class, logConsumeClient.getClass());
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
new file mode 100644
index 000000000..280c77906
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/LoggingKafkaPluginTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka;
+
+import java.net.InetSocketAddress;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.RemoteAddressResolver;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+/**
+ * The Test Case For LoggingKafkaPlugin.
+ */
+@ExtendWith(MockitoExtension.class)
+public final class LoggingKafkaPluginTest {
+
+ private LoggingKafkaPlugin loggingKafkaPlugin;
+
+ private ServerWebExchange exchange;
+
+ private RuleData ruleData;
+
+ private ShenyuPluginChain chain;
+
+ private SelectorData selectorData;
+
+ @BeforeEach
+ public void setUp() {
+ this.loggingKafkaPlugin = new LoggingKafkaPlugin();
+ this.ruleData = Mockito.mock(RuleData.class);
+ this.chain = Mockito.mock(ShenyuPluginChain.class);
+ this.selectorData = Mockito.mock(SelectorData.class);
+ MockServerHttpRequest request = MockServerHttpRequest
+ .get("localhost")
+ .remoteAddress(new InetSocketAddress(8090))
+ .header("X-source", "mock test")
+ .queryParam("queryParam", "Hello,World")
+ .build();
+ ConfigurableApplicationContext context = Mockito.mock(ConfigurableApplicationContext.class);
+ SpringBeanUtils.getInstance().setApplicationContext(context);
+ RemoteAddressResolver remoteAddressResolver = new RemoteAddressResolver() {
+ };
+ Mockito.lenient().when(context.getBean(RemoteAddressResolver.class)).thenReturn(remoteAddressResolver);
+ this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+ ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+ exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+ }
+
+ @Test
+ public void testDoExecute() {
+ Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
+ Mono<Void> result = loggingKafkaPlugin.doExecute(exchange, chain, selectorData, ruleData);
+ StepVerifier.create(result).expectSubscription().verifyComplete();
+ }
+
+ @Test
+ public void testGetOrder() {
+ Assertions.assertEquals(loggingKafkaPlugin.getOrder(), PluginEnum.LOGGING_KAFKA.getCode());
+ }
+
+ @Test
+ public void testNamed() {
+ Assertions.assertEquals(loggingKafkaPlugin.named(), PluginEnum.LOGGING_KAFKA.getName());
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriterTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriterTest.java
new file mode 100644
index 000000000..d46634a0b
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/BodyWriterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For BodyWriter.
+ */
+public class BodyWriterTest {
+
+ private BodyWriter writer;
+
+ private String sendString;
+
+ private ByteBuffer byteBuffer;
+
+ @BeforeEach
+ public void setUp() throws UnsupportedEncodingException {
+ this.writer = new BodyWriter();
+ this.sendString = "hello, shenyu";
+ byteBuffer = ByteBuffer.wrap(sendString.getBytes("UTF-8"));
+ }
+
+ @Test
+ public void testWrite() throws UnsupportedEncodingException {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(sendString.getBytes("UTF-8"));
+ writer.write(byteBuffer.asReadOnlyBuffer());
+ String res = writer.output();
+ Assertions.assertEquals(res, "hello, shenyu");
+ }
+
+ @Test
+ public void testIsEmpty() {
+ Assertions.assertEquals(writer.isEmpty(), true);
+ }
+
+ @Test
+ public void testSize() {
+ writer.write(byteBuffer.asReadOnlyBuffer());
+ int size = writer.size();
+ Assertions.assertEquals(size, 13);
+ }
+
+ @Test
+ public void testOutput() {
+ writer.write(byteBuffer.asReadOnlyBuffer());
+ String res = writer.output();
+ Assertions.assertEquals(res, "hello, shenyu");
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponseTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponseTest.java
new file mode 100644
index 000000000..0414eafeb
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/body/LoggingServerHttpResponseTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.body;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.RemoteAddressResolver;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.result.ShenyuResult;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.apache.shenyu.plugin.base.utils.HostAddressUtils;
+import org.apache.shenyu.plugin.logging.kafka.DefaultLogCollector;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.kafka.utils.LogCollectUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * The Test Case For LoggingServerHttpResponse.
+ */
+public class LoggingServerHttpResponseTest {
+
+ private ShenyuRequestLog requestInfo = new ShenyuRequestLog();
+
+ private ServerWebExchange exchange;
+
+ private LoggingServerHttpResponse loggingServerHttpResponse;
+
+ private ServerHttpRequest serverHttpRequest;
+
+ private String userAgent = "User-Agent";
+
+ private String host = "Host";
+
+ private LocalDateTime startDateTime = LocalDateTime.now();
+
+ @BeforeEach
+ public void setUp() {
+ MockServerHttpRequest request = MockServerHttpRequest
+ .post("localhost")
+ .remoteAddress(new InetSocketAddress(8090))
+ .header("X-source", "mock test")
+ .queryParam("queryParam", "Hello,World")
+ .body("hello");
+ ConfigurableApplicationContext context = Mockito.mock(ConfigurableApplicationContext.class);
+ SpringBeanUtils.getInstance().setApplicationContext(context);
+ RemoteAddressResolver remoteAddressResolver = new RemoteAddressResolver() {
+ };
+ ShenyuResult shenyuResult = new ShenyuResult() {
+ };
+ this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+ Mockito.lenient().when(context.getBean(RemoteAddressResolver.class)).thenReturn(remoteAddressResolver);
+ Mockito.lenient().when(context.getBean(ShenyuResult.class)).thenReturn(shenyuResult);
+ ShenyuContext shenyuContext = new ShenyuContext();
+ shenyuContext.setStartDateTime(startDateTime);
+ exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+ exchange.getAttributes().put(LoggingConstant.SHENYU_AGENT_TRACE_ID, "shenyu-agent-trace-id");
+ exchange.getAttributes().put(Constants.HTTP_DOMAIN, "http://localhost:9195/http/order/path/123/name");
+ this.serverHttpRequest = exchange.getRequest();
+ requestInfo.setRequestUri(serverHttpRequest.getURI().toString());
+ requestInfo.setMethod(serverHttpRequest.getMethodValue());
+ requestInfo.setRequestHeader(LogCollectUtils.getHeaders(serverHttpRequest.getHeaders()));
+ requestInfo.setQueryParams(serverHttpRequest.getURI().getQuery());
+ requestInfo.setClientIp(HostAddressUtils.acquireIp(exchange));
+ requestInfo.setUserAgent(serverHttpRequest.getHeaders().getFirst(userAgent));
+ requestInfo.setHost(serverHttpRequest.getHeaders().getFirst(host));
+ requestInfo.setPath(serverHttpRequest.getURI().getPath());
+ this.loggingServerHttpResponse = new LoggingServerHttpResponse(exchange.getResponse(), requestInfo, DefaultLogCollector.getInstance());
+ }
+
+ @Test
+ public void testSetExchange() throws NoSuchFieldException, IllegalAccessException {
+ loggingServerHttpResponse.setExchange(exchange);
+ Field field = loggingServerHttpResponse.getClass().getDeclaredField("exchange");
+ field.setAccessible(true);
+ Assertions.assertEquals(field.get(loggingServerHttpResponse), exchange);
+ }
+
+ @Test
+ public void testGetTraceId() throws Exception {
+ loggingServerHttpResponse.setExchange(exchange);
+ exchange.getResponse().getHeaders();
+ Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("getTraceId");
+ method.setAccessible(true);
+ String traceId = (String) method.invoke(loggingServerHttpResponse);
+ Assertions.assertEquals(traceId, "shenyu-agent-trace-id");
+ }
+
+ @Test
+ public void testGetUpstreamIp() throws Exception {
+ loggingServerHttpResponse.setExchange(exchange);
+ Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("getUpstreamIp");
+ method.setAccessible(true);
+ String upstreamIp = (String) method.invoke(loggingServerHttpResponse);
+ Assertions.assertEquals(upstreamIp, "localhost");
+ }
+
+ @Test
+ public void testGetUpstreamIpFromHttpDomain() throws Exception {
+ loggingServerHttpResponse.setExchange(exchange);
+ Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("getUpstreamIpFromHttpDomain");
+ method.setAccessible(true);
+ String upstreamIpFromHttpDomain = (String) method.invoke(loggingServerHttpResponse);
+ Assertions.assertEquals(upstreamIpFromHttpDomain, "localhost");
+ }
+
+ @Test
+ public void testLogError() throws NoSuchFieldException, IllegalAccessException {
+ loggingServerHttpResponse.setExchange(exchange);
+ Throwable throwable = new Throwable("error");
+ DefaultLogCollector.getInstance().start();
+ loggingServerHttpResponse.logError(throwable);
+ Field field = loggingServerHttpResponse.getClass().getDeclaredField("logInfo");
+ field.setAccessible(true);
+ ShenyuRequestLog shenyuRequestLog = (ShenyuRequestLog) field.get(loggingServerHttpResponse);
+ Assertions.assertEquals(shenyuRequestLog.getStatus(), 500);
+ }
+
+ @Test
+ public void testLogResponse() throws Exception {
+ DefaultLogCollector.getInstance().start();
+ loggingServerHttpResponse.setExchange(exchange);
+ BodyWriter writer = new BodyWriter();
+ String sendString = "hello, shenyu";
+ ByteBuffer byteBuffer = ByteBuffer.wrap(sendString.getBytes("UTF-8"));
+ writer.write(byteBuffer.asReadOnlyBuffer());
+ Method method = loggingServerHttpResponse.getClass().getDeclaredMethod("logResponse", ShenyuContext.class, BodyWriter.class);
+ method.setAccessible(true);
+ method.invoke(loggingServerHttpResponse, exchange.getAttribute(Constants.CONTEXT), writer);
+ Field field = loggingServerHttpResponse.getClass().getDeclaredField("logInfo");
+ field.setAccessible(true);
+ ShenyuRequestLog log = (ShenyuRequestLog) field.get(loggingServerHttpResponse);
+ Assertions.assertEquals(log.getResponseBody(), "hello, shenyu");
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfigTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfigTest.java
new file mode 100644
index 000000000..4097a9875
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/config/LogCollectConfigTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.config;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For LogCollectConfig.
+ */
+public class LogCollectConfigTest {
+
+ private LogCollectConfig logCollectConfig = new LogCollectConfig();
+
+ @Test
+ public void testSetLogApiConfigTopic() {
+ LogCollectConfig.LogApiConfig logApiConfig = new LogCollectConfig.LogApiConfig();
+ logApiConfig.setTopic("test");
+ Assertions.assertEquals(logApiConfig.getTopic(), "test");
+ }
+
+ @Test
+ public void testSetLogApiConfigSampleRate() {
+ LogCollectConfig.LogApiConfig logApiConfig = new LogCollectConfig.LogApiConfig();
+ logApiConfig.setSampleRate("1");
+ Assertions.assertEquals(logApiConfig.getSampleRate(), "1");
+ }
+
+ @Test
+ public void testGetGlobalLogConfigSampleRate() {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+ globalLogConfig.setSampleRate("1");
+ Assertions.assertEquals(globalLogConfig.getSampleRate(), "1");
+ }
+
+ @Test
+ public void testSetGlobalLogConfigTopic() {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+ globalLogConfig.setTopic("test");
+ Assertions.assertEquals(globalLogConfig.getTopic(), "test");
+ }
+
+ @Test
+ public void testSetGlobalLogConfigMaxResponseBody() {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+ globalLogConfig.setMaxResponseBody(5);
+ Assertions.assertEquals(globalLogConfig.getMaxResponseBody(), 5);
+ }
+
+ @Test
+ public void testSetGlobalLogConfigMaxRequestBody() {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+ globalLogConfig.setMaxRequestBody(5);
+ Assertions.assertEquals(globalLogConfig.getMaxRequestBody(), 5);
+ }
+
+ @Test
+ public void testSetGlobalLogConfigNamesrvAddr() {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+ globalLogConfig.setNamesrvAddr("test");
+ Assertions.assertEquals(globalLogConfig.getNamesrvAddr(), "test");
+ }
+
+ @Test
+ public void testSetGlobalLogConfigProducerGroup() {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+ globalLogConfig.setProducerGroup("test");
+ Assertions.assertEquals(globalLogConfig.getProducerGroup(), "test");
+ }
+
+ @Test
+ public void testGetGlobalLogConfig() {
+ LogCollectConfig.GlobalLogConfig globalLogConfig = new LogCollectConfig.GlobalLogConfig();
+ logCollectConfig.setGlobalLogConfig(globalLogConfig);
+ Assertions.assertEquals(logCollectConfig.getGlobalLogConfig(), globalLogConfig);
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressDataTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressDataTest.java
new file mode 100644
index 000000000..4f7132d67
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/LZ4CompressDataTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For LZ4CompressData.
+ */
+public class LZ4CompressDataTest {
+
+ private int length;
+
+ private byte[] compressedData;
+
+ private LZ4CompressData lz4CompressData;
+
+ @BeforeEach
+ public void setUp() {
+ this.length = 5;
+ this.compressedData = new byte[] {'h', 'e', 'l', 'l', 'o'};
+ this.lz4CompressData = new LZ4CompressData(length, compressedData);
+ }
+
+ @Test
+ public void testGetLength() {
+ lz4CompressData.setLength(6);
+ Assertions.assertEquals(lz4CompressData.getLength(), 6);
+ }
+
+ @Test
+ public void testGetCompressedData() {
+ byte[] bytes = new byte[] {'h', 'e', 'l', 'l', 'o', '!'};
+ lz4CompressData.setCompressedData(bytes);
+ Assertions.assertEquals(lz4CompressData.getCompressedData(), bytes);
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLogTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLogTest.java
new file mode 100644
index 000000000..dc512dee4
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/entity/ShenyuRequestLogTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.entity;
+
+import java.time.LocalDateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For ShenyuRequestLog.
+ */
+public class ShenyuRequestLogTest {
+
+ private ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+ @Test
+ public void testGetModule() {
+ shenyuRequestLog.setModule("test");
+ Assertions.assertEquals(shenyuRequestLog.getModule(), "test");
+ }
+
+ @Test
+ public void testResponseContentLength() {
+ shenyuRequestLog.setResponseContentLength(5);
+ Assertions.assertEquals(shenyuRequestLog.getResponseContentLength(), 5);
+ }
+
+ @Test
+ public void testGetUserAgent() {
+ shenyuRequestLog.setUserAgent("test");
+ Assertions.assertEquals(shenyuRequestLog.getUserAgent(), "test");
+ }
+
+ @Test
+ public void testGetHost() {
+ shenyuRequestLog.setHost("test");
+ Assertions.assertEquals(shenyuRequestLog.getHost(), "test");
+ }
+
+ @Test
+ public void testGetClientIp() {
+ shenyuRequestLog.setClientIp("0.0.0.0");
+ Assertions.assertEquals(shenyuRequestLog.getClientIp(), "0.0.0.0");
+ }
+
+ @Test
+ public void testGetTimeLocal() {
+ LocalDateTime timeLocal = LocalDateTime.now();
+ shenyuRequestLog.setTimeLocal(timeLocal.toString());
+ Assertions.assertEquals(shenyuRequestLog.getTimeLocal(), timeLocal.toString());
+ }
+
+ @Test
+ public void testGetMethod() {
+ shenyuRequestLog.setMethod("test");
+ Assertions.assertEquals(shenyuRequestLog.getMethod(), "test");
+ }
+
+ @Test
+ public void testGetRequestBody() {
+ shenyuRequestLog.setRequestBody("hello");
+ Assertions.assertEquals(shenyuRequestLog.getRequestBody(), "hello");
+ }
+
+ @Test
+ public void testGetUpstreamIp() {
+ shenyuRequestLog.setUpstreamIp("0.0.0.0");
+ Assertions.assertEquals(shenyuRequestLog.getUpstreamIp(), "0.0.0.0");
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
new file mode 100644
index 000000000..e869a927f
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/handler/LoggingKafkaPluginDataHandlerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.handler;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.plugin.logging.kafka.kafka.KafkaLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For LoggingKafkaPluginDataHandler.
+ */
+public class LoggingKafkaPluginDataHandlerTest {
+
+ private LoggingKafkaPluginDataHandler loggingKafkaPluginDataHandler;
+
+ private SelectorData selectorData = new SelectorData();
+
+ private ConditionData conditionData = new ConditionData();
+
+ private PluginData pluginData = new PluginData();
+
+ @BeforeEach
+ public void setUp() {
+ this.loggingKafkaPluginDataHandler = new LoggingKafkaPluginDataHandler();
+ selectorData.setId("1");
+ selectorData.setType(1);
+ selectorData.setHandle("{\"topic\":\"test\", \"sampleRate\":\"1\"}");
+ conditionData.setParamName("id");
+ conditionData.setParamType("uri");
+ conditionData.setParamValue("11");
+ conditionData.setOperator("=");
+ List<ConditionData> list = new ArrayList<>();
+ list.add(conditionData);
+ selectorData.setConditionList(list);
+ pluginData.setEnabled(true);
+ pluginData.setConfig("{\"topic\":\"test\", \"namesrvAddr\":\"localhost:8082\"}");
+ }
+
+ @Test
+ public void testHandlerPlugin() throws NoSuchFieldException, IllegalAccessException {
+ loggingKafkaPluginDataHandler.handlerPlugin(pluginData);
+ Field field = loggingKafkaPluginDataHandler.getClass().getDeclaredField("KAFKA_LOG_COLLECT_CLIENT");
+ field.setAccessible(true);
+ Assertions.assertEquals(field.get(loggingKafkaPluginDataHandler).getClass(), KafkaLogCollectClient.class);
+ }
+
+ @Test
+ public void testHandlerSelector() throws NoSuchFieldException, IllegalAccessException {
+ loggingKafkaPluginDataHandler.handlerSelector(selectorData);
+ Field field1 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+ field1.setAccessible(true);
+ Field field2 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+ field2.setAccessible(true);
+ Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
+ Assertions.assertNotEquals(field2.get("1").toString(), "{}");
+ }
+
+ @Test
+ public void testRemoveSelector() throws NoSuchFieldException, IllegalAccessException {
+ loggingKafkaPluginDataHandler.handlerSelector(selectorData);
+ Field field1 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+ field1.setAccessible(true);
+ Field field2 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+ field2.setAccessible(true);
+ Assertions.assertEquals(field1.get("1").toString(), "{1=[11]}");
+ Assertions.assertNotEquals(field2.get("1").toString(), "{}");
+ loggingKafkaPluginDataHandler.removeSelector(selectorData);
+ Field field3 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_ID_URI_LIST_MAP");
+ field3.setAccessible(true);
+ Field field4 = loggingKafkaPluginDataHandler.getClass().getDeclaredField("SELECT_API_CONFIG_MAP");
+ field4.setAccessible(true);
+ Assertions.assertEquals(field3.get("1").toString(), "{}");
+ Assertions.assertEquals(field4.get("1").toString(), "{}");
+ }
+
+ @Test
+ public void testPluginNamed() {
+ Assertions.assertEquals(loggingKafkaPluginDataHandler.pluginNamed(), "loggingKafka");
+ }
+
+ @Test
+ public void testGetKafkaLogCollectClient() {
+ Assertions.assertEquals(loggingKafkaPluginDataHandler.getKafkaLogCollectClient().getClass(), KafkaLogCollectClient.class);
+ }
+
+ @Test
+ public void testGetSelectIdUriListMap() {
+ Assertions.assertEquals(LoggingKafkaPluginDataHandler.getSelectIdUriListMap().getClass(), ConcurrentHashMap.class);
+ }
+
+ @Test
+ public void testGetSelectApiConfigMap() {
+ Assertions.assertEquals(LoggingKafkaPluginDataHandler.getSelectApiConfigMap().getClass(), ConcurrentHashMap.class);
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClientTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClientTest.java
new file mode 100644
index 000000000..4092282c5
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/kafka/KafkaLogCollectClientTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.kafka;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig.GlobalLogConfig;
+import org.apache.shenyu.plugin.logging.kafka.constant.LoggingConstant;
+import org.apache.shenyu.plugin.logging.kafka.entity.ShenyuRequestLog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The Test Case For RocketMQLogCollectClient.
+ */
+public class KafkaLogCollectClientTest {
+
+ private final Properties props = new Properties();
+
+ private final PluginData pluginData = new PluginData();
+
+ private final List<ShenyuRequestLog> logs = new ArrayList<>();
+
+ private final ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+ private KafkaLogCollectClient kafkaLogCollectClient;
+
+ private GlobalLogConfig globalLogConfig;
+
+ @BeforeEach
+ public void setUp() {
+ this.kafkaLogCollectClient = new KafkaLogCollectClient();
+ pluginData.setEnabled(true);
+ pluginData.setConfig("{\"topic\":\"shenyu-access-logging\", \"namesrvAddr\":\"localhost:8082\"}");
+ globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
+ GlobalLogConfig.class);
+ globalLogConfig.setCompressAlg("LZ4");
+ props.put("bootstrap.servers", globalLogConfig.getNamesrvAddr());
+ props.put(LoggingConstant.NAMESERVER_ADDRESS, globalLogConfig.getNamesrvAddr());
+ props.setProperty(LoggingConstant.TOPIC, globalLogConfig.getTopic());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ shenyuRequestLog.setClientIp("0.0.0.0");
+ shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+ logs.add(shenyuRequestLog);
+ }
+
+ @Test
+ public void testInitProducer() throws NoSuchFieldException, IllegalAccessException {
+ kafkaLogCollectClient.initProducer(props);
+ Field field = kafkaLogCollectClient.getClass().getDeclaredField("topic");
+ field.setAccessible(true);
+ Assertions.assertEquals(field.get(kafkaLogCollectClient), "shenyu-access-logging");
+ kafkaLogCollectClient.close();
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSamplerTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSamplerTest.java
new file mode 100644
index 000000000..917252d07
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/sampler/CountSamplerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.sampler;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.BitSet;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * The Test Case For CountSampler.
+ */
+public class CountSamplerTest {
+
+ private CountSampler countSampler;
+
+ private ServerHttpRequest request;
+
+ private ServerWebExchange exchange;
+
+ @BeforeEach
+ public void setUp() {
+ this.countSampler = new CountSampler(1);
+ MockServerHttpRequest request = MockServerHttpRequest
+ .get("localhost")
+ .remoteAddress(new InetSocketAddress(8090))
+ .header("X-source", "mock test")
+ .queryParam("queryParam", "Hello,World")
+ .build();
+ this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+ ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+ exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+ this.request = exchange.getRequest();
+ }
+
+ @Test
+ public void testIsSampled() {
+ Assertions.assertEquals(countSampler.isSampled(request), true);
+ }
+
+ @Test
+ public void testMod() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method method = countSampler.getClass().getDeclaredMethod("mod", int.class);
+ method.setAccessible(true);
+ int res = (int) method.invoke(countSampler, 1);
+ Assertions.assertEquals(res, 1);
+ }
+
+ @Test
+ public void testGenRandomBitSet() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method method = countSampler.getClass().getDeclaredMethod("genRandomBitSet", int.class, int.class);
+ method.setAccessible(true);
+ BitSet bitSet = (BitSet) method.invoke(countSampler, 1, 1);
+ BitSet res = new BitSet(1);
+ res.set(0);
+ Assertions.assertEquals(bitSet, res);
+ }
+
+ @Test
+ public void testCreate() {
+ Assertions.assertEquals(CountSampler.create(""), Sampler.ALWAYS_SAMPLE);
+ Assertions.assertEquals(CountSampler.create("0"), Sampler.NEVER_SAMPLE);
+ Assertions.assertEquals(CountSampler.create("1"), Sampler.ALWAYS_SAMPLE);
+ Assertions.assertEquals(CountSampler.create("0.5").getClass(), CountSampler.class);
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtilsTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtilsTest.java
new file mode 100644
index 000000000..33dac94a1
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectConfigUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.logging.kafka.config.LogCollectConfig.GlobalLogConfig;
+import org.apache.shenyu.plugin.logging.kafka.sampler.Sampler;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * The Test Case For LogCollectConfigUtils.
+ */
+
+public class LogCollectConfigUtilsTest {
+
+ private GlobalLogConfig config = new GlobalLogConfig();
+
+ private ServerWebExchange exchange;
+
+ private ServerHttpRequest request;
+
+ private Map<String, String> uriSampleMap = new HashMap<>();
+
+ private Map<String, String> apiTopicMap = new HashMap<>();
+
+ @BeforeEach
+ public void setUp() {
+ config.setBufferQueueSize(5000);
+ LogCollectConfigUtils.setGlobalConfig(config);
+ uriSampleMap.put("const", "1");
+ apiTopicMap.put("topic", "shenyu-access-logging");
+ MockServerHttpRequest request = MockServerHttpRequest
+ .get("localhost")
+ .remoteAddress(new InetSocketAddress(8090))
+ .header("X-source", "mock test")
+ .queryParam("queryParam", "Hello,World")
+ .build();
+ this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+ ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+ exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+ this.request = exchange.getRequest();
+ }
+
+ @Test
+ public void testGetGlobalConfig() {
+ GlobalLogConfig globalLogConfig = LogCollectConfigUtils.getGlobalLogConfig();
+ assertEquals(globalLogConfig.getClass(), GlobalLogConfig.class);
+ }
+
+ @Test
+ public void testSetGlobalConfig() {
+ assertEquals(LogCollectConfigUtils.getGlobalLogConfig().getBufferQueueSize(), 5000);
+ }
+
+ @Test
+ public void testSetSampler() throws IllegalAccessException, NoSuchFieldException {
+ LogCollectConfigUtils.setSampler(uriSampleMap);
+ Field field = LogCollectConfigUtils.class.getDeclaredField("apiSamplerMap");
+ field.setAccessible(true);
+ Assertions.assertEquals(field.get("const").toString(), "{const=" + Sampler.ALWAYS_SAMPLE + "}");
+ }
+
+ @Test
+ public void testIsSampled() {
+ assertEquals(LogCollectConfigUtils.isSampled(request), false);
+ }
+
+ @Test
+ public void testIsRequestBodyTooLarge() {
+ assertEquals(LogCollectConfigUtils.isRequestBodyTooLarge(524289), true);
+ assertEquals(LogCollectConfigUtils.isRequestBodyTooLarge(524288), false);
+ }
+
+ @Test
+ public void testIsResponseBodyTooLarge() {
+ assertEquals(LogCollectConfigUtils.isResponseBodyTooLarge(524289), true);
+ assertEquals(LogCollectConfigUtils.isResponseBodyTooLarge(524288), false);
+ }
+
+ @Test
+ public void testGetTopic() {
+ assertEquals(LogCollectConfigUtils.getTopic("topic"), "");
+ }
+
+ @Test
+ public void testSetTopic() {
+ LogCollectConfigUtils.setTopic(apiTopicMap);
+ assertEquals(LogCollectConfigUtils.getTopic("topic"), "shenyu-access-logging");
+ }
+
+ @Test
+ public void testSetGlobalSampler() throws NoSuchFieldException, IllegalAccessException {
+ LogCollectConfigUtils.setGlobalSampler("0");
+ Field field = LogCollectConfigUtils.class.getDeclaredField("globalSampler");
+ field.setAccessible(true);
+ assertEquals(field.get("const"), Sampler.NEVER_SAMPLE);
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtilsTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtilsTest.java
new file mode 100644
index 000000000..25482bd91
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-kafka/src/test/java/org/apache/shenyu/plugin/logging/kafka/utils/LogCollectUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.kafka.utils;
+
+import java.net.InetSocketAddress;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * The Test Case For LogCollectUtils.
+ */
+public class LogCollectUtilsTest {
+
+ private ServerWebExchange exchange;
+
+ private ServerHttpRequest request;
+
+ @BeforeEach
+ public void setUp() {
+ MockServerHttpRequest request = MockServerHttpRequest
+ .get("localhost")
+ .remoteAddress(new InetSocketAddress(8090))
+ .header("X-source", "mock test")
+ .queryParam("queryParam", "Hello,World")
+ .build();
+ this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+ ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+ exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+ this.request = exchange.getRequest();
+ }
+
+ @Test
+ public void testIsNotBinaryType() {
+ assertEquals(LogCollectUtils.isNotBinaryType(request.getHeaders()), true);
+ }
+
+ @Test
+ public void testGetHeaders() {
+ assertEquals(LogCollectUtils.getHeaders(request.getHeaders()), "{\"X-source\":\"mock test\"}");
+ }
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
index 3acd0a769..e1524a5a2 100644
--- a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
@@ -59,6 +59,7 @@
<module>shenyu-spring-boot-starter-plugin-mqtt</module>
<module>shenyu-spring-boot-starter-plugin-metrics</module>
<module>shenyu-spring-boot-starter-plugin-logging-rocketmq</module>
+ <module>shenyu-spring-boot-starter-plugin-logging-kafka</module>
<module>shenyu-spring-boot-starter-plugin-logging-elasticsearch</module>
<module>shenyu-spring-boot-starter-plugin-cache</module>
<module>shenyu-spring-boot-starter-plugin-mock</module>
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/pom.xml
similarity index 51%
copy from shenyu-plugin/shenyu-plugin-logging/pom.xml
copy to shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/pom.xml
index beb8864ef..76840602e 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/pom.xml
@@ -16,21 +16,31 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.shenyu</groupId>
- <artifactId>shenyu-plugin</artifactId>
+ <artifactId>shenyu-spring-boot-starter-plugin</artifactId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>shenyu-plugin-logging</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>shenyu-plugin-logging-console</module>
- <module>shenyu-plugin-logging-rocketmq</module>
- <module>shenyu-plugin-logging-elasticsearch</module>
- </modules>
+ <artifactId>shenyu-spring-boot-starter-plugin-logging-kafka</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin-logging-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
</project>
\ No newline at end of file
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfiguration.java b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfiguration.java
new file mode 100644
index 000000000..6c5d12b17
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.springboot.starter.plugin.logging.kafka;
+
+import org.apache.shenyu.plugin.api.ShenyuPlugin;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.kafka.LoggingKafkaPlugin;
+import org.apache.shenyu.plugin.logging.kafka.handler.LoggingKafkaPluginDataHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * config logging Kafka plugin.
+ */
+@Configuration
+@ConditionalOnProperty(value = {"shenyu.plugins.logging-kafka.enabled"}, havingValue = "true", matchIfMissing = true)
+public class LoggingKafkaPluginConfiguration {
+
+ /**
+ * logging Kafka plugin data handler.
+ * @return logging Kafka PluginDataHandler
+ */
+ @Bean
+ public PluginDataHandler loggingKafkaPluginDataHandler() {
+ return new LoggingKafkaPluginDataHandler();
+ }
+
+ /**
+ * Logging Kafka plugin.
+ * @return LoggingKafkaPlugin
+ */
+ @Bean
+ public ShenyuPlugin loggingKafkaPlugin() {
+ return new LoggingKafkaPlugin();
+ }
+
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.factories b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..3c94c46ab
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.apache.shenyu.springboot.starter.plugin.logging.kafka.LoggingKafkaPluginConfiguration
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.provides b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.provides
new file mode 100644
index 000000000..8bf0b1e58
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/main/resources/META-INF/spring.provides
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+provides: shenyu-spring-boot-starter-plugin-logging-kafka
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/test/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfigurationTest.java b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/test/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfigurationTest.java
new file mode 100644
index 000000000..75e005ad2
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-kafka/src/test/java/org/apache/shenyu/springboot/starter/plugin/logging/kafka/LoggingKafkaPluginConfigurationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.springboot.starter.plugin.logging.kafka;
+
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.ShenyuPlugin;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.context.annotation.Configuration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * logging kafka plugin config test.
+ */
+@Configuration
+@EnableConfigurationProperties
+public class LoggingKafkaPluginConfigurationTest {
+
+ private ApplicationContextRunner applicationContextRunner;
+
+ @BeforeEach
+ public void before() {
+ applicationContextRunner = new ApplicationContextRunner()
+ .withConfiguration(AutoConfigurations.of(LoggingKafkaPluginConfiguration.class))
+ .withBean(LoggingKafkaPluginConfigurationTest.class);
+ }
+
+ @Test
+ public void testLoggingKafkaPlugin() {
+ applicationContextRunner
+ .withPropertyValues(
+ "debug=true",
+ "shenyu.logging.kafka.enabled=true"
+ )
+ .run(context -> {
+ PluginDataHandler pluginDataHandler = context.getBean("loggingKafkaPluginDataHandler", PluginDataHandler.class);
+ assertNotNull(pluginDataHandler);
+
+ ShenyuPlugin plugin = context.getBean("loggingKafkaPlugin", ShenyuPlugin.class);
+ assertNotNull(plugin);
+ assertThat(plugin.named()).isEqualTo(PluginEnum.LOGGING_KAFKA.getName());
+ });
+ }
+}