You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by xi...@apache.org on 2022/09/01 03:05:39 UTC

[shenyu] branch master updated: [type:feature] Shenyu add logging-pulsar plugin (#3788)

This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new fb72cab93 [type:feature] Shenyu add logging-pulsar plugin  (#3788)
fb72cab93 is described below

commit fb72cab937a06c4d5d550b796ef3719ea206aef8
Author: Bowen Li <27...@qq.com>
AuthorDate: Thu Sep 1 11:05:32 2022 +0800

    [type:feature] Shenyu add logging-pulsar plugin  (#3788)
    
    * shenyu logging add pulsar
    
    * shenyu logging add pulsar test
    
    * fix checkstyle
    
    * add license and refactor pom.xml
    
    * update version to 2.5.1
    
    * improve sql init
---
 db/init/mysql/schema.sql                           |   7 +
 db/init/oracle/schema.sql                          |  18 +++
 db/init/pg/create-table.sql                        |   8 ++
 db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql           |  28 ++++
 db/upgrade/2.5.0-upgrade-2.5.1-pg.sql              |  28 ++++
 pom.xml                                            |   1 +
 .../src/main/resources/sql-script/h2/schema.sql    |   8 +-
 shenyu-bootstrap/pom.xml                           |   8 ++
 .../org/apache/shenyu/common/enums/PluginEnum.java |   5 +
 .../src/main/release-docs/LICENSE                  |   1 +
 shenyu-plugin/shenyu-plugin-logging/pom.xml        |   1 +
 .../common/constant/GenericLoggingConstant.java    |   5 +
 .../shenyu-plugin-logging-pulsar/pom.xml           |  58 ++++++++
 .../plugin/logging/pulsar/LoggingPulsarPlugin.java |  70 +++++++++
 .../pulsar/client/PulsarLogCollectClient.java      | 126 +++++++++++++++++
 .../pulsar/collector/PulsarLogCollector.java       |  42 ++++++
 .../pulsar/config/PulsarLogCollectConfig.java      | 157 +++++++++++++++++++++
 .../handler/LoggingPulsarPluginDataHandler.java    |  95 +++++++++++++
 .../logging/pulsar/LoggingPulsarPluginTest.java    | 103 ++++++++++++++
 .../pulsar/client/PulsarLogCollectClientTest.java  |  74 ++++++++++
 .../pulsar/collector/PulsarLogCollectorTest.java   |  58 ++++++++
 .../pulsar/config/PulsarCollectConfigTest.java     |  95 +++++++++++++
 .../LoggingPulsarPluginDataHandlerTest.java        |  78 ++++++++++
 .../shenyu-spring-boot-starter-plugin/pom.xml      |   1 +
 .../pom.xml                                        |  34 +++--
 .../pulsar/LoggingPulsarPluginConfiguration.java   |  54 +++++++
 .../src/main/resources/META-INF/spring.factories   |  19 +++
 .../src/main/resources/META-INF/spring.provides    |  18 +++
 28 files changed, 1187 insertions(+), 13 deletions(-)

diff --git a/db/init/mysql/schema.sql b/db/init/mysql/schema.sql
index 9033b7727..d592ffd33 100644
--- a/db/init/mysql/schema.sql
+++ b/db/init/mysql/schema.sql
@@ -596,6 +596,7 @@ INSERT INTO `plugin` VALUES ('9', 'hystrix', NULL, 'FaultTolerance', 130, 0, '20
 INSERT INTO `plugin` VALUES ('32', 'loggingElasticSearch','{\"host\":\"localhost\", \"port\": \"9200\"}', 'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00');
 INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"host\":\"localhost\", \"port\": \"9092\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00');
 INSERT INTO `plugin` VALUES ('34', 'loggingAliyunSls','{\"projectName\": \"shenyu\", \"logStoreName\": \"shenyu-logstore\", \"topic\": \"shenyu-topic\"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin` VALUES ('35', 'loggingPulsar', '{\"topic":\"shenyu-access-logging\", \"serviceUrl\": \"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
 
 -- ----------------------------
 -- Table structure for plugin_handle
@@ -811,6 +812,12 @@ INSERT INTO `plugin_handle` VALUES ('1529402613204172902', '34', 'sampleRate', '
 INSERT INTO `plugin_handle` VALUES ('1529402613204172903', '34', 'maxRequestBody', 'maxRequestBody', 1, 3, 11, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
 INSERT INTO `plugin_handle` VALUES ('1529402613204172904', '34', 'maxResponseBody', 'maxResponseBody', 1, 3, 12, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
 INSERT INTO `plugin_handle` VALUES ('1529402613204172905', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{\"required\":\"0\",\"defaultValue\":50000}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172906', '35', 'topic', 'topic', 2, 3, 1, '{\"required\":\"1\",\"defaultValue\":\"shenyu-access-logging\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172907', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{\"required":"1",\"defaultValue\":\"pulsar://localhost:6650\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172908', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{\"required":"0",\"defaultValue\":\"1\",\"placeholder\":\"optional,0,0.01~1\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172909', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172910', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172911', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{\"required\":\"0\",\"defaultValue\":\"none\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
 -- ----------------------------
 -- Table structure for resource
 -- ----------------------------
diff --git a/db/init/oracle/schema.sql b/db/init/oracle/schema.sql
index e04e08369..44be01687 100644
--- a/db/init/oracle/schema.sql
+++ b/db/init/oracle/schema.sql
@@ -854,6 +854,7 @@ INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, role
 INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('32', 'loggingElasticSearch', '{"host":"localhost", "port": "9200"}', 'Logging', 190, 0);
 INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('33', 'loggingKafka', '{"topic":"shenyu-access-logging", "namesrvAddr": "localhost:9092"}', 'Logging', 180, 0);
 INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('34', 'loggingAliyunSls', '{"projectName": "shenyu", "logStoreName": "shenyu-logstore", "topic": "shenyu-topic"}', 'Logging', 175, '0');
+INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('35', 'loggingPulsar', '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', 'Logging', 185, '0')
 
 
 
@@ -1427,6 +1428,23 @@ values ('1518229897214468196', '34', 'maxResponseBody', 'maxResponseBody', 1, 3,
 insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
 values ('1518229897214468197', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{"required":"0","defaultValue":524288,"placeholder":""}');
 
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468198', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468199', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468200', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468201', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468202', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468203', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}');
 
 
 /** insert resource for resource */
diff --git a/db/init/pg/create-table.sql b/db/init/pg/create-table.sql
index bfe3004c1..ca07fb578 100644
--- a/db/init/pg/create-table.sql
+++ b/db/init/pg/create-table.sql
@@ -673,6 +673,8 @@ INSERT INTO "public"."plugin" VALUES ('31', 'mock', null, 'Mock', 1, 0, '2022-06
 INSERT INTO "public"."plugin" VALUES ('32', 'loggingElasticSearch', '{"host":"localhost", "port": "9200"}', 'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00');
 INSERT INTO "public"."plugin" VALUES ('33', 'loggingKafka', '{"topic":"shenyu-access-logging", "namesrvAddr": "localhost:9092"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-04 22:00:00');
 INSERT INTO "public"."plugin" VALUES ('34', 'loggingAliyunSls', '{"projectName": "shenyu", "logStoreName": "shenyu-logstore", "topic": "shenyu-topic"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin" VALUES ('35', 'loggingPulsar', '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', 'Logging', 185, 0, '2022-05-25 18:08:01', '2022-05-25 18:08:01');
+
 
 -- ----------------------------
 -- Table structure for plugin_handle
@@ -899,6 +901,12 @@ INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524968', '34', 'sampl
 INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524969', '34', 'maxRequestBody', 'maxRequestBody', 1, 3, 11, '{"required":"0","defaultValue":524288,"placeholder":""}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
 INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524970', '34', 'maxResponseBody', 'maxResponseBody', 1, 3, 12, '{"required":"0","defaultValue":524288,"placeholder":""}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
 INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524971', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{"required":"0","defaultValue":50000,"placeholder":""}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524972', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524973', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524974', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524975', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524976', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524977', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
 
 -- ----------------------------
 -- Table structure for resource
diff --git a/db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql b/db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql
new file mode 100644
index 000000000..4a5e015ec
--- /dev/null
+++ b/db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql
@@ -0,0 +1,28 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- this file works for MySQL.
+
+/* insert plugin for loggingPulsar */
+INSERT INTO `plugin` VALUES ('35', 'loggingPulsar', '{\"topic":\"shenyu-access-logging\", \"serviceUrl\": \"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+
+/* insert plugin_handle data for plugin loggingPulsar */
+INSERT INTO `plugin_handle` VALUES ('1529402613204172906', '35', 'topic', 'topic', 2, 3, 1, '{\"required\":\"1\",\"defaultValue\":\"shenyu-access-logging\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172907', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{\"required":"1",\"defaultValue\":\"pulsar://localhost:6650\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172908', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{\"required":"0",\"defaultValue\":\"1\",\"placeholder\":\"optional,0,0.01~1\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172909', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172910', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172911', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{\"required\":\"0\",\"defaultValue\":\"none\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
diff --git a/db/upgrade/2.5.0-upgrade-2.5.1-pg.sql b/db/upgrade/2.5.0-upgrade-2.5.1-pg.sql
new file mode 100644
index 000000000..6f459fe9c
--- /dev/null
+++ b/db/upgrade/2.5.0-upgrade-2.5.1-pg.sql
@@ -0,0 +1,28 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- this file works for PostgreSQL, can not use "`" syntax.
+
+/* insert plugin for loggingPulsar */
+INSERT INTO "public"."plugin" VALUES ('35', 'loggingPulsar', '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', 'Logging', 185, 0, '2022-05-25 18:08:01', '2022-05-25 18:08:01');
+
+/* insert plugin_handle data for plugin loggingPulsar */
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524972', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524973', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524974', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524975', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524976', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524977', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
diff --git a/pom.xml b/pom.xml
index a3f57ebfd..b75250b57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,6 +125,7 @@
         <spring-security.version>5.3.10.RELEASE</spring-security.version>
         <grpc.version>1.33.1</grpc.version>
         <rocketmq-client.version>4.9.3</rocketmq-client.version>
+        <pulsar-client.version>2.10.1</pulsar-client.version>
         <lz4-java.version>1.8.0</lz4-java.version>
         <elasticsearch-java.version>8.2.3</elasticsearch-java.version>
         <jackson-databind.version>2.12.3</jackson-databind.version>
diff --git a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
index efc542ab7..1100f6ca0 100755
--- a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
+++ b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
@@ -389,6 +389,7 @@ INSERT IGNORE INTO `plugin` (`id`, `name`, `config`, `role`, `sort`, `enabled`)
 INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('32', 'loggingElasticSearch', 'Logging', 190,'{"host":"localhost", "port": "9200"}', '0');
 INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('33', 'loggingKafka', 'Logging', 180,'{"topic":"shenyu-access-logging", "namesrvAddr": "localhost:9092"}', '0');
 INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('34', 'loggingAliyunSls', 'Logging', 175, '{"projectName": "shenyu", "logStoreName": "shenyu-logstore", "topic": "shenyu-topic"}', '0');
+INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('35', 'loggingPulsar', 'Logging', 185, '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', '0');
 
 
 /*insert plugin_handle data for sentinel*/
@@ -586,7 +587,12 @@ INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,
 INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172903', '34', 'maxRequestBody', 'maxRequestBody', 1, 3, 11, '{"required":"0","defaultValue":524288,"placeholder":""}');
 INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172904', '34', 'maxResponseBody', 'maxResponseBody', 1, 3, 12, '{"required":"0","defaultValue":524288,"placeholder":""}');
 INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172905', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{"required":"0","defaultValue":50000,"placeholder":""}');
-
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172906', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172907', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172908', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172909', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172910', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172911', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}');
 
 /** insert resource for resource */
 INSERT IGNORE INTO `resource` (`id`, `parent_id`, `title`, `name`, `url`, `component`, `resource_type`, `sort`, `icon`, `is_leaf`, `is_route`, `perms`, `status`) VALUES('1346775491550474240','','SHENYU.MENU.PLUGIN.LIST','plug','/plug','PluginList','0','0','dashboard','0','0','','1');
diff --git a/shenyu-bootstrap/pom.xml b/shenyu-bootstrap/pom.xml
index 8639552e0..5490ba43e 100644
--- a/shenyu-bootstrap/pom.xml
+++ b/shenyu-bootstrap/pom.xml
@@ -495,6 +495,14 @@
         </dependency>
         <!-- shenyu logging-aliyunsls plugin end -->
 
+        <!-- shenyu logging-pulsar plugin start -->
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-spring-boot-starter-plugin-logging-pulsar</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- shenyu logging-pulsar plugin end -->
+
     </dependencies>
     <profiles>
         <profile>
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
index 6431fafba..da773d0e2 100644
--- a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
@@ -147,6 +147,11 @@ public enum PluginEnum {
      */
     LOGGING_KAFKA(180, 0, "loggingKafka"),
 
+    /**
+     * Logging Pulsar plugin enum.
+     */
+    LOGGING_PULSAR(185, 0, "loggingPulsar"),
+
     /**
      * Logging ElasticSearch plugin enum.
      */
diff --git a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
index 0cb40678b..7df288262 100644
--- a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
+++ b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
@@ -507,6 +507,7 @@ The text of each license is the standard Apache 2.0 license.
     kafka-clients 3.2.0: https://kafka.apache.org/, Apache 2.0
     elasticsearch-java 8.2.3: https://github.com/elastic/elasticsearch-java, Apache 2.0
     elasticsearch-rest-client 8.2.3: https://github.com/elastic/elasticsearch, Apache 2.0
+    pulsar-client 2.10.1: https://github.com/apache/pulsar, Apache 2.0
     aliyun-log 0.6.57: https://github.com/aliyun/aliyun-log-java-producer, Apache 2.0
     aliyun-log-producer 0.3.10: https://github.com/aliyun/aliyun-log-java-producer, Apache 2.0
 
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-plugin/shenyu-plugin-logging/pom.xml
index 67a1cf229..f9187f919 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-plugin/shenyu-plugin-logging/pom.xml
@@ -33,5 +33,6 @@
         <module>shenyu-plugin-logging-kafka</module>
         <module>shenyu-plugin-logging-elasticsearch</module>
         <module>shenyu-plugin-logging-aliyun-sls</module>
+        <module>shenyu-plugin-logging-pulsar</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
index 1204a6250..14d7fcb04 100644
--- a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
@@ -111,6 +111,11 @@ public class GenericLoggingConstant {
      * The constant NAMESERVER_ADDRESS.
      */
     public static final String NAMESERVER_ADDRESS = "namesrvAddr";
+
+    /**
+     * The constant SERVICE_URL.
+     */
+    public static final String SERVICE_URL = "serviceUrl";
     
     /**
      * The constant PRODUCER_GROUP.
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/pom.xml b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/pom.xml
new file mode 100644
index 000000000..1b7d5cbda
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.shenyu</groupId>
+        <artifactId>shenyu-plugin-logging</artifactId>
+        <version>2.5.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>shenyu-plugin-logging-pulsar</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-plugin-logging-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <version>${pulsar-client.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+            <version>${lz4-java.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
new file mode 100644
index 000000000..ce02b3b3a
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar;
+
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+
+/**
+ * Integrated pulsar collect log.
+ */
+public class LoggingPulsarPlugin extends AbstractLoggingPlugin {
+
+    @Override
+    public Mono<Void> doLogExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain,
+                                   final SelectorData selector, final RuleData rule,
+                                   final ServerHttpRequest request, final ShenyuRequestLog requestInfo) {
+        LoggingServerHttpRequest loggingServerHttpRequest = new LoggingServerHttpRequest(request, requestInfo);
+        LoggingServerHttpResponse loggingServerHttpResponse = new LoggingServerHttpResponse(exchange.getResponse(),
+                requestInfo, PulsarLogCollector.getInstance());
+        ServerWebExchange webExchange = exchange.mutate().request(loggingServerHttpRequest)
+                .response(loggingServerHttpResponse).build();
+        loggingServerHttpResponse.setExchange(webExchange);
+        return chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+    }
+
+    /**
+     * get plugin order.
+     *
+     * @return order
+     */
+    @Override
+    public int getOrder() {
+        return PluginEnum.LOGGING_PULSAR.getCode();
+    }
+
+    /**
+     * get plugin name.
+     *
+     * @return plugin name
+     */
+    @Override
+    public String named() {
+        return PluginEnum.LOGGING_PULSAR.getName();
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
new file mode 100644
index 000000000..04c1b0a9e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.client;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * queue-based logging collector.
+ */
+public class PulsarLogCollectClient implements LogConsumeClient {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarLogCollectClient.class);
+
+    private PulsarClient client;
+
+    private Producer<byte[]> producer;
+
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    /**
+     * init producer.
+     * 
+     * @param props pulsar props
+     */
+    public void initProducer(final Properties props) {
+        if (MapUtils.isEmpty(props)) {
+            LOG.error("Pulsar props is empty. Fail to init Pulsar producer.");
+            return;
+        }
+        if (isStarted.get()) {
+            close();
+        }
+        String topic = props.getProperty(GenericLoggingConstant.TOPIC);
+        String serviceUrl = props.getProperty(GenericLoggingConstant.SERVICE_URL);
+        if (StringUtils.isBlank(topic) || StringUtils.isBlank(serviceUrl)) {
+            LOG.error("init PulsarLogCollectClient error, please check topic or serviceUrl.");
+            return;
+        }
+        try {
+            client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+            producer = client.newProducer().topic(topic).create();
+            LOG.info("init PulsarLogCollectClient success.");
+            isStarted.set(true);
+            Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+
+        } catch (PulsarClientException e) {
+            LOG.error("init PulsarLogCollectClient error, ", e);
+        }
+
+    }
+
+    @Override
+    public void consume(final List<ShenyuRequestLog> logs) {
+        if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+            return;
+        }
+        logs.forEach(log -> {
+            producer.sendAsync(toBytes(log));
+        });
+    }
+
+    private byte[] toBytes(final ShenyuRequestLog log) {
+        byte[] bytes = JsonUtils.toJson(log).getBytes(StandardCharsets.UTF_8);
+        String compressAlg = StringUtils.defaultIfBlank(PulsarLogCollectConfig.INSTANCE.getPulsarLogConfig().getCompressAlg(), "");
+        if ("LZ4".equalsIgnoreCase(compressAlg.trim())) {
+            LZ4CompressData lz4CompressData = new LZ4CompressData(bytes.length, compressedByte(bytes));
+            return JsonUtils.toJson(lz4CompressData).getBytes(StandardCharsets.UTF_8);
+        } else {
+            return bytes;
+        }
+    }
+
+    private byte[] compressedByte(final byte[] srcByte) {
+        LZ4Factory factory = LZ4Factory.fastestInstance();
+        LZ4Compressor compressor = factory.fastCompressor();
+        return compressor.compress(srcByte);
+    }
+
+    @Override
+    public void close() {
+        if (Objects.nonNull(producer) && isStarted.get()) {
+            try {
+                producer.close();
+                client.close();
+                isStarted.set(false);
+            } catch (PulsarClientException e) {
+                LOG.error("fail to close PulsarLogCollectClient, e", e);
+            }
+        }
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollector.java
new file mode 100644
index 000000000..2b885c90e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.collector;
+
+import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.collector.AbstractLogCollector;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.pulsar.handler.LoggingPulsarPluginDataHandler;
+
+public class PulsarLogCollector extends AbstractLogCollector {
+
+    private static final LogCollector INSTANCE = new PulsarLogCollector();
+
+    /**
+     * get LogCollector Instance.
+     *
+     * @return LogCollector instance
+     */
+    public static LogCollector getInstance() {
+        return INSTANCE;
+    }
+    
+    @Override
+    protected LogConsumeClient getLogConsumeClient() {
+        return LoggingPulsarPluginDataHandler.getPulsarLogCollectClient();
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
new file mode 100644
index 000000000..2cb489979
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.config;
+
+import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+
+import java.util.Optional;
+
+public class PulsarLogCollectConfig {
+
+    public static final PulsarLogCollectConfig INSTANCE = new PulsarLogCollectConfig();
+
+    private PulsarLogConfig pulsarLogConfig;
+
+    /**
+     * get global log config.
+     *
+     * @return global log config
+     */
+    public PulsarLogConfig getPulsarLogConfig() {
+        return Optional.ofNullable(pulsarLogConfig).orElse(new PulsarLogConfig());
+    }
+
+    /**
+     * set global log config.
+     *
+     * @param pulsarLogConfig global log config
+     */
+    public void setPulsarLogConfig(final PulsarLogConfig pulsarLogConfig) {
+        this.pulsarLogConfig = pulsarLogConfig;
+    }
+
+    public static class PulsarLogConfig extends GenericGlobalConfig {
+
+        private String compressAlg;
+
+        private String topic;
+
+        private String serviceUrl;
+
+        /**
+         * whether compress.
+         *
+         * @return compress or not
+         */
+        public String getCompressAlg() {
+            return compressAlg;
+        }
+
+        /**
+         * set compress.
+         *
+         * @param compressAlg compress alg.
+         */
+        public void setCompressAlg(final String compressAlg) {
+            this.compressAlg = compressAlg;
+        }
+
+        /**
+         * get message queue topic.
+         *
+         * @return message queue topic
+         */
+        public String getTopic() {
+            return topic;
+        }
+
+        /**
+         * topic,used for message queue.
+         *
+         * @param topic mq topic
+         */
+        public void setTopic(final String topic) {
+            this.topic = topic;
+        }
+
+        /**
+         * get pulsar service URL.
+         * @return pulsar service URL
+         */
+        public String getServiceUrl() {
+            return serviceUrl;
+        }
+
+        /**
+         * set pulsar service URL.
+         * @param serviceUrl pulsar service URL
+         */
+        public void setServiceUrl(final String serviceUrl) {
+            this.serviceUrl = serviceUrl;
+        }
+    }
+
+    public static class LogApiConfig {
+
+        /**
+         * 0 means never sample, 1 means always sample. Minimum probability is 0.01, or 1% of logging
+         */
+        private String sampleRate;
+
+        /**
+         * This topic is useful if you use message queuing to collect logs.
+         */
+        private String topic;
+
+        /**
+         * get sample rate.
+         *
+         * @return sample rate
+         */
+        public String getSampleRate() {
+            return sampleRate;
+        }
+
+        /**
+         * set sample rate.
+         *
+         * @param sampleRate sample rate
+         */
+        public void setSampleRate(final String sampleRate) {
+            this.sampleRate = sampleRate;
+        }
+
+        /**
+         * get mq topic.
+         *
+         * @return mq topic
+         */
+        public String getTopic() {
+            return topic;
+        }
+
+        /**
+         * set mq topic.
+         *
+         * @param topic mq topic
+         */
+        public void setTopic(final String topic) {
+            this.topic = topic;
+        }
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
new file mode 100644
index 000000000..f6a97cd5d
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.handler;
+
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
+import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
+import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * The type logging pulsar plugin data handler.
+ */
+public class LoggingPulsarPluginDataHandler implements PluginDataHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LoggingPulsarPluginDataHandler.class);
+
+    private static final PulsarLogCollectClient PULSAR_LOG_COLLECT_CLIENT = new PulsarLogCollectClient();
+
+    private static final String EMPTY_JSON = "{}";
+
+    private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP = new ConcurrentHashMap<>();
+
+    @Override
+    public void handlerPlugin(final PluginData pluginData) {
+        LOG.info("handler loggingPulsar Plugin data:{}", GsonUtils.getGson().toJson(pluginData));
+        if (pluginData.getEnabled()) {
+            PulsarLogCollectConfig.PulsarLogConfig globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
+                    PulsarLogCollectConfig.PulsarLogConfig.class);
+            PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(globalLogConfig);
+            // start pulsar producer
+            Properties properties = new Properties();
+            properties.setProperty(GenericLoggingConstant.TOPIC, globalLogConfig.getTopic());
+            properties.setProperty(GenericLoggingConstant.SERVICE_URL, globalLogConfig.getServiceUrl());
+            PULSAR_LOG_COLLECT_CLIENT.initProducer(properties);
+            PulsarLogCollector.getInstance().start();
+        } else {
+            try {
+                PulsarLogCollector.getInstance().close();
+            } catch (Exception e) {
+                LOG.error("close log collector error", e);
+            }
+        }
+    }
+
+    @Override
+    public void handlerSelector(final SelectorData selectorData) {
+        PluginDataHandler.super.handlerSelector(selectorData);
+    }
+
+    @Override
+    public void removeSelector(final SelectorData selectorData) {
+        PluginDataHandler.super.removeSelector(selectorData);
+    }
+
+    @Override
+    public String pluginNamed() {
+        return PluginEnum.LOGGING_PULSAR.getName();
+    }
+
+    /**
+     * get pulsar log collect client.
+     * @return pulsar log collect client
+     */
+    public static PulsarLogCollectClient getPulsarLogCollectClient() {
+        return PULSAR_LOG_COLLECT_CLIENT;
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
new file mode 100644
index 000000000..52ae59f98
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar;
+
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.RemoteAddressResolver;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.net.InetSocketAddress;
+
+@ExtendWith(MockitoExtension.class)
+public class LoggingPulsarPluginTest {
+
+    private LoggingPulsarPlugin loggingPulsarPlugin;
+
+    private ServerWebExchange exchange;
+
+    private RuleData ruleData;
+
+    private ShenyuPluginChain chain;
+
+    private SelectorData selectorData;
+
+    private ServerHttpRequest request;
+
+    private ShenyuRequestLog requestLog;
+
+    @BeforeEach
+    public void setUp() {
+        this.loggingPulsarPlugin = new LoggingPulsarPlugin();
+        this.ruleData = Mockito.mock(RuleData.class);
+        this.chain = Mockito.mock(ShenyuPluginChain.class);
+        this.selectorData = Mockito.mock(SelectorData.class);
+        this.request = Mockito.mock(ServerHttpRequest.class);
+        this.requestLog = new ShenyuRequestLog();
+        MockServerHttpRequest request = MockServerHttpRequest
+                .get("localhost")
+                .remoteAddress(new InetSocketAddress(8090))
+                .header("X-source", "mock test")
+                .queryParam("queryParam", "Hello,World")
+                .build();
+        ConfigurableApplicationContext context = Mockito.mock(ConfigurableApplicationContext.class);
+        SpringBeanUtils.getInstance().setApplicationContext(context);
+        RemoteAddressResolver remoteAddressResolver = new RemoteAddressResolver() {
+        };
+        Mockito.lenient().when(context.getBean(RemoteAddressResolver.class)).thenReturn(remoteAddressResolver);
+        this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+        ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+        exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+    }
+
+    @Test
+    public void testDoExecute() {
+        Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
+        Mono<Void> result = loggingPulsarPlugin.doLogExecute(exchange, chain, selectorData, ruleData, request, requestLog);
+        StepVerifier.create(result).expectSubscription().verifyComplete();
+    }
+
+    @Test
+    public void testGetOrder() {
+        Assertions.assertEquals(loggingPulsarPlugin.getOrder(), PluginEnum.LOGGING_PULSAR.getCode());
+    }
+
+    @Test
+    public void testNamed() {
+        Assertions.assertEquals(loggingPulsarPlugin.named(), PluginEnum.LOGGING_PULSAR.getName());
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
new file mode 100644
index 000000000..c0c1589d5
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.client;
+
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class PulsarLogCollectClientTest {
+
+    private final PluginData pluginData = new PluginData();
+
+    private final Properties properties = new Properties();
+
+    private final List<ShenyuRequestLog> logs = new ArrayList<>();
+
+    private final ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+    private PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig;
+
+    private PulsarLogCollectClient pulsarLogCollectClient;
+
+    @BeforeEach
+    public void setup() {
+        this.pulsarLogCollectClient = new PulsarLogCollectClient();
+        pluginData.setEnabled(true);
+        pluginData.setConfig("{\"topic\":\"test\", \"serviceUrl\":\"test\"}");
+        pulsarLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), PulsarLogCollectConfig.PulsarLogConfig.class);
+        properties.setProperty(GenericLoggingConstant.TOPIC, pulsarLogConfig.getTopic());
+        properties.setProperty(GenericLoggingConstant.SERVICE_URL, pulsarLogConfig.getServiceUrl());
+        shenyuRequestLog.setClientIp("0.0.0.0");
+        shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+        logs.add(shenyuRequestLog);
+    }
+
+    @Test
+    public void testConsume() {
+        String msg = "";
+        PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(pulsarLogConfig);
+        pulsarLogCollectClient.initProducer(properties);
+        try {
+            pulsarLogCollectClient.consume(logs);
+        } catch (Exception e) {
+            msg = "false";
+        }
+        Assertions.assertEquals(msg, "");
+        pulsarLogCollectClient.close();
+    }
+}
+
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollectorTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollectorTest.java
new file mode 100644
index 000000000..e59fc72c1
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollectorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.collector;
+
+import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.collector.AbstractLogCollector;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+
+public class PulsarLogCollectorTest {
+
+    private final ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+    @BeforeEach
+    public void setup() {
+        shenyuRequestLog.setClientIp("0.0.0.0");
+        shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+    }
+
+    @Test
+    public void testAbstractLogCollector() throws Exception {
+        PulsarLogCollector.getInstance().start();
+        Field field1 = AbstractLogCollector.class.getDeclaredField("started");
+        field1.setAccessible(true);
+        Assertions.assertEquals(field1.get(PulsarLogCollector.getInstance()).toString(), "true");
+        PulsarLogCollector.getInstance().collect(shenyuRequestLog);
+        PulsarLogCollector.getInstance().close();
+        Field field2 = AbstractLogCollector.class.getDeclaredField("started");
+        field2.setAccessible(true);
+        Assertions.assertEquals(field2.get(PulsarLogCollector.getInstance()).toString(), "false");
+    }
+
+    @Test
+    public void testGetLogConsumeClient() {
+        LogConsumeClient logConsumeClient = new PulsarLogCollector().getLogConsumeClient();
+        Assertions.assertEquals(PulsarLogCollectClient.class, logConsumeClient.getClass());
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarCollectConfigTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarCollectConfigTest.java
new file mode 100644
index 000000000..a119c642e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarCollectConfigTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.config;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PulsarCollectConfigTest {
+
+    @Test
+    public void testSetLogApiConfigTopic() {
+        PulsarLogCollectConfig.LogApiConfig logApiConfig = new PulsarLogCollectConfig.LogApiConfig();
+        logApiConfig.setTopic("test");
+        Assertions.assertEquals(logApiConfig.getTopic(), "test");
+    }
+
+    @Test
+    public void testSetLogApiConfigSampleRate() {
+        PulsarLogCollectConfig.LogApiConfig logApiConfig = new PulsarLogCollectConfig.LogApiConfig();
+        logApiConfig.setSampleRate("1");
+        Assertions.assertEquals(logApiConfig.getSampleRate(), "1");
+    }
+
+    @Test
+    public void testGetGlobalLogConfigSampleRate() {
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogConfig.setSampleRate("1");
+        Assertions.assertEquals(pulsarLogConfig.getSampleRate(), "1");
+    }
+
+    @Test
+    public void testSetGlobalLogConfigTopic() {
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogConfig.setTopic("test");
+        Assertions.assertEquals(pulsarLogConfig.getTopic(), "test");
+    }
+
+    @Test
+    public void testSetGlobalLogConfigMaxResponseBody() {
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogConfig.setMaxResponseBody(5);
+        Assertions.assertEquals(pulsarLogConfig.getMaxResponseBody(), 5);
+    }
+
+    @Test
+    public void testSetGlobalLogConfigMaxRequestBody() {
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogConfig.setMaxRequestBody(5);
+        Assertions.assertEquals(pulsarLogConfig.getMaxRequestBody(), 5);
+    }
+
+    @Test
+    public void testSetGlobalLogConfigServiceUrl() {
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogConfig.setServiceUrl("test");
+        Assertions.assertEquals(pulsarLogConfig.getServiceUrl(), "test");
+    }
+
+    @Test
+    public void testGetGlobalLogConfig() {
+        PulsarLogCollectConfig pulsarLogCollectConfig = new PulsarLogCollectConfig();
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogCollectConfig.setPulsarLogConfig(pulsarLogConfig);
+        Assertions.assertEquals(pulsarLogCollectConfig.getPulsarLogConfig(), pulsarLogConfig);
+    }
+
+    @Test
+    public void testCompressAlg() {
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogConfig.setCompressAlg("LZ4");
+        Assertions.assertEquals(pulsarLogConfig.getCompressAlg(), "LZ4");
+    }
+
+    @Test
+    public void testBufferQueueSize() {
+        PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+        pulsarLogConfig.setBufferQueueSize(50000);
+        Assertions.assertEquals(pulsarLogConfig.getBufferQueueSize(), 50000);
+    }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandlerTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandlerTest.java
new file mode 100644
index 000000000..e43a3c7c6
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandlerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.handler;
+
+import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LoggingPulsarPluginDataHandlerTest {
+
+    private final SelectorData selectorData = new SelectorData();
+
+    private final ConditionData conditionData = new ConditionData();
+
+    private final PluginData pluginData = new PluginData();
+
+    private LoggingPulsarPluginDataHandler loggingPulsarPluginDataHandler;
+
+    @BeforeEach
+    public void setup() {
+        this.loggingPulsarPluginDataHandler = new LoggingPulsarPluginDataHandler();
+        selectorData.setId("1");
+        selectorData.setType(1);
+        selectorData.setHandle("{\"topic\":\"test\",\"sampleRate\":\"1\"}");
+        conditionData.setParamName("id");
+        conditionData.setParamType("uri");
+        conditionData.setParamValue("11");
+        conditionData.setOperator("=");
+        List<ConditionData> list = new ArrayList<>();
+        list.add(conditionData);
+        selectorData.setConditionList(list);
+        pluginData.setEnabled(true);
+        pluginData.setConfig("{\"topic\":\"test\", \"serviceUrl\":\"test\"}");
+    }
+
+    @Test
+    public void testHandlerPlugin() throws NoSuchFieldException, IllegalAccessException {
+        loggingPulsarPluginDataHandler.handlerPlugin(pluginData);
+        Field field = loggingPulsarPluginDataHandler.getClass().getDeclaredField("PULSAR_LOG_COLLECT_CLIENT");
+        field.setAccessible(true);
+        Assertions.assertEquals(field.get(loggingPulsarPluginDataHandler).getClass(), PulsarLogCollectClient.class);
+        pluginData.setEnabled(false);
+        loggingPulsarPluginDataHandler.handlerPlugin(pluginData);
+    }
+
+    @Test
+    public void testPluginNamed() {
+        Assertions.assertEquals(loggingPulsarPluginDataHandler.pluginNamed(), "loggingPulsar");
+    }
+
+    @Test
+    public void testGetPulsarLogCollectClient() {
+        Assertions.assertEquals(LoggingPulsarPluginDataHandler.getPulsarLogCollectClient().getClass(), PulsarLogCollectClient.class);
+    }
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
index b52f96ceb..13d84f909 100644
--- a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
@@ -64,5 +64,6 @@
         <module>shenyu-spring-boot-starter-plugin-cache</module>
         <module>shenyu-spring-boot-starter-plugin-mock</module>
         <module>shenyu-spring-boot-starter-plugin-logging-aliyun-sls</module>
+        <module>shenyu-spring-boot-starter-plugin-logging-pulsar</module>
     </modules>
 </project>
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/pom.xml
similarity index 51%
copy from shenyu-plugin/shenyu-plugin-logging/pom.xml
copy to shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/pom.xml
index 67a1cf229..249c20b17 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/pom.xml
@@ -16,22 +16,32 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.shenyu</groupId>
-        <artifactId>shenyu-plugin</artifactId>
+        <artifactId>shenyu-spring-boot-starter-plugin</artifactId>
         <version>2.5.1-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>shenyu-plugin-logging</artifactId>
-    <packaging>pom</packaging>
 
-    <modules>
-        <module>shenyu-plugin-logging-common</module>
-        <module>shenyu-plugin-logging-console</module>
-        <module>shenyu-plugin-logging-rocketmq</module>
-        <module>shenyu-plugin-logging-kafka</module>
-        <module>shenyu-plugin-logging-elasticsearch</module>
-        <module>shenyu-plugin-logging-aliyun-sls</module>
-    </modules>
+    <artifactId>shenyu-spring-boot-starter-plugin-logging-pulsar</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-plugin-logging-pulsar</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-test</artifactId>
+        </dependency>
+    </dependencies>
+    
 </project>
\ No newline at end of file
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/pulsar/LoggingPulsarPluginConfiguration.java b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/pulsar/LoggingPulsarPluginConfiguration.java
new file mode 100644
index 000000000..213a1220d
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/pulsar/LoggingPulsarPluginConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.springboot.starter.plugin.logging.pulsar;
+
+import org.apache.shenyu.plugin.api.ShenyuPlugin;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.pulsar.LoggingPulsarPlugin;
+import org.apache.shenyu.plugin.logging.pulsar.handler.LoggingPulsarPluginDataHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * config logging Pulsar plugin.
+ */
+@Configuration
+@ConditionalOnProperty(value = {"shenyu.plugins.logging-pulsar.enabled"}, havingValue = "true", matchIfMissing = true)
+public class LoggingPulsarPluginConfiguration {
+
+    /**
+     * logging pulsar plugin data handler.
+     *
+     * @return logging pulsar PluginDataHandler
+     */
+    @Bean
+    public PluginDataHandler loggingPulsarPluginDataHandler() {
+        return new LoggingPulsarPluginDataHandler();
+    }
+
+    /**
+     * Logging Pulsar plugin.
+     *
+     * @return LoggingPulsarPlugin
+     */
+    @Bean
+    public ShenyuPlugin loggingPulsarPlugin() {
+        return new LoggingPulsarPlugin();
+    }
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.factories b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..648bbe794
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.apache.shenyu.springboot.starter.plugin.logging.pulsar.LoggingPulsarPluginConfiguration
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.provides b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.provides
new file mode 100644
index 000000000..1aba60f24
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.provides
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+provides: shenyu-spring-boot-starter-plugin-logging-pulsar