You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by xi...@apache.org on 2022/09/01 03:05:39 UTC
[shenyu] branch master updated: [type:feature] Shenyu add logging-pulsar plugin (#3788)
This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new fb72cab93 [type:feature] Shenyu add logging-pulsar plugin (#3788)
fb72cab93 is described below
commit fb72cab937a06c4d5d550b796ef3719ea206aef8
Author: Bowen Li <27...@qq.com>
AuthorDate: Thu Sep 1 11:05:32 2022 +0800
[type:feature] Shenyu add logging-pulsar plugin (#3788)
* shenyu logging add pulsar
* shenyu logging add pulsar test
* fix checkstyle
* add license and refactor pom.xml
* update version to 2.5.1
* improve sql init
---
db/init/mysql/schema.sql | 7 +
db/init/oracle/schema.sql | 18 +++
db/init/pg/create-table.sql | 8 ++
db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql | 28 ++++
db/upgrade/2.5.0-upgrade-2.5.1-pg.sql | 28 ++++
pom.xml | 1 +
.../src/main/resources/sql-script/h2/schema.sql | 8 +-
shenyu-bootstrap/pom.xml | 8 ++
.../org/apache/shenyu/common/enums/PluginEnum.java | 5 +
.../src/main/release-docs/LICENSE | 1 +
shenyu-plugin/shenyu-plugin-logging/pom.xml | 1 +
.../common/constant/GenericLoggingConstant.java | 5 +
.../shenyu-plugin-logging-pulsar/pom.xml | 58 ++++++++
.../plugin/logging/pulsar/LoggingPulsarPlugin.java | 70 +++++++++
.../pulsar/client/PulsarLogCollectClient.java | 126 +++++++++++++++++
.../pulsar/collector/PulsarLogCollector.java | 42 ++++++
.../pulsar/config/PulsarLogCollectConfig.java | 157 +++++++++++++++++++++
.../handler/LoggingPulsarPluginDataHandler.java | 95 +++++++++++++
.../logging/pulsar/LoggingPulsarPluginTest.java | 103 ++++++++++++++
.../pulsar/client/PulsarLogCollectClientTest.java | 74 ++++++++++
.../pulsar/collector/PulsarLogCollectorTest.java | 58 ++++++++
.../pulsar/config/PulsarCollectConfigTest.java | 95 +++++++++++++
.../LoggingPulsarPluginDataHandlerTest.java | 78 ++++++++++
.../shenyu-spring-boot-starter-plugin/pom.xml | 1 +
.../pom.xml | 34 +++--
.../pulsar/LoggingPulsarPluginConfiguration.java | 54 +++++++
.../src/main/resources/META-INF/spring.factories | 19 +++
.../src/main/resources/META-INF/spring.provides | 18 +++
28 files changed, 1187 insertions(+), 13 deletions(-)
diff --git a/db/init/mysql/schema.sql b/db/init/mysql/schema.sql
index 9033b7727..d592ffd33 100644
--- a/db/init/mysql/schema.sql
+++ b/db/init/mysql/schema.sql
@@ -596,6 +596,7 @@ INSERT INTO `plugin` VALUES ('9', 'hystrix', NULL, 'FaultTolerance', 130, 0, '20
INSERT INTO `plugin` VALUES ('32', 'loggingElasticSearch','{\"host\":\"localhost\", \"port\": \"9200\"}', 'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00');
INSERT INTO `plugin` VALUES ('33', 'loggingKafka','{\"host\":\"localhost\", \"port\": \"9092\"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-02 22:00:00');
INSERT INTO `plugin` VALUES ('34', 'loggingAliyunSls','{\"projectName\": \"shenyu\", \"logStoreName\": \"shenyu-logstore\", \"topic\": \"shenyu-topic\"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin` VALUES ('35', 'loggingPulsar', '{\"topic":\"shenyu-access-logging\", \"serviceUrl\": \"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
-- ----------------------------
-- Table structure for plugin_handle
@@ -811,6 +812,12 @@ INSERT INTO `plugin_handle` VALUES ('1529402613204172902', '34', 'sampleRate', '
INSERT INTO `plugin_handle` VALUES ('1529402613204172903', '34', 'maxRequestBody', 'maxRequestBody', 1, 3, 11, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
INSERT INTO `plugin_handle` VALUES ('1529402613204172904', '34', 'maxResponseBody', 'maxResponseBody', 1, 3, 12, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
INSERT INTO `plugin_handle` VALUES ('1529402613204172905', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{\"required\":\"0\",\"defaultValue\":50000}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172906', '35', 'topic', 'topic', 2, 3, 1, '{\"required\":\"1\",\"defaultValue\":\"shenyu-access-logging\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172907', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{\"required":"1",\"defaultValue\":\"pulsar://localhost:6650\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172908', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{\"required":"0",\"defaultValue\":\"1\",\"placeholder\":\"optional,0,0.01~1\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172909', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172910', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172911', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{\"required\":\"0\",\"defaultValue\":\"none\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
-- ----------------------------
-- Table structure for resource
-- ----------------------------
diff --git a/db/init/oracle/schema.sql b/db/init/oracle/schema.sql
index e04e08369..44be01687 100644
--- a/db/init/oracle/schema.sql
+++ b/db/init/oracle/schema.sql
@@ -854,6 +854,7 @@ INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, role
INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('32', 'loggingElasticSearch', '{"host":"localhost", "port": "9200"}', 'Logging', 190, 0);
INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('33', 'loggingKafka', '{"topic":"shenyu-access-logging", "namesrvAddr": "localhost:9092"}', 'Logging', 180, 0);
INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('34', 'loggingAliyunSls', '{"projectName": "shenyu", "logStoreName": "shenyu-logstore", "topic": "shenyu-topic"}', 'Logging', 175, '0');
+INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin(id)) */ INTO plugin (id, name, config, role, sort, enabled) VALUES ('35', 'loggingPulsar', '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', 'Logging', 185, '0')
@@ -1427,6 +1428,23 @@ values ('1518229897214468196', '34', 'maxResponseBody', 'maxResponseBody', 1, 3,
insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
values ('1518229897214468197', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{"required":"0","defaultValue":524288,"placeholder":""}');
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468198', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468199', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468200', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468201', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468202', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}');
+
+insert /*+ IGNORE_ROW_ON_DUPKEY_INDEX(plugin_handle(plugin_id, field, type)) */ into plugin_handle (ID, PLUGIN_ID, FIELD, LABEL, DATA_TYPE, TYPE, SORT, EXT_OBJ)
+values ('1518229897214468203', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}');
/** insert resource for resource */
diff --git a/db/init/pg/create-table.sql b/db/init/pg/create-table.sql
index bfe3004c1..ca07fb578 100644
--- a/db/init/pg/create-table.sql
+++ b/db/init/pg/create-table.sql
@@ -673,6 +673,8 @@ INSERT INTO "public"."plugin" VALUES ('31', 'mock', null, 'Mock', 1, 0, '2022-06
INSERT INTO "public"."plugin" VALUES ('32', 'loggingElasticSearch', '{"host":"localhost", "port": "9200"}', 'Logging', 190, 0, '2022-06-19 22:00:00', '2022-06-19 22:00:00');
INSERT INTO "public"."plugin" VALUES ('33', 'loggingKafka', '{"topic":"shenyu-access-logging", "namesrvAddr": "localhost:9092"}', 'Logging', 180, 0, '2022-07-04 22:00:00', '2022-07-04 22:00:00');
INSERT INTO "public"."plugin" VALUES ('34', 'loggingAliyunSls', '{"projectName": "shenyu", "logStoreName": "shenyu-logstore", "topic": "shenyu-topic"}', 'Logging', 175, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin" VALUES ('35', 'loggingPulsar', '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', 'Logging', 185, 0, '2022-05-25 18:08:01', '2022-05-25 18:08:01');
+
-- ----------------------------
-- Table structure for plugin_handle
@@ -899,6 +901,12 @@ INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524968', '34', 'sampl
INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524969', '34', 'maxRequestBody', 'maxRequestBody', 1, 3, 11, '{"required":"0","defaultValue":524288,"placeholder":""}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524970', '34', 'maxResponseBody', 'maxResponseBody', 1, 3, 12, '{"required":"0","defaultValue":524288,"placeholder":""}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524971', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{"required":"0","defaultValue":50000,"placeholder":""}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524972', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524973', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524974', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524975', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524976', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524977', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
-- ----------------------------
-- Table structure for resource
diff --git a/db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql b/db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql
new file mode 100644
index 000000000..4a5e015ec
--- /dev/null
+++ b/db/upgrade/2.5.0-upgrade-2.5.1-mysql.sql
@@ -0,0 +1,28 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- this file works for MySQL.
+
+/* insert plugin for loggingPulsar */
+INSERT INTO `plugin` VALUES ('35', 'loggingPulsar', '{\"topic":\"shenyu-access-logging\", \"serviceUrl\": \"pulsar://localhost:6650\"}', 'Logging', 185, 0, '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+
+/* insert plugin_handle data for plugin loggingPulsar */
+INSERT INTO `plugin_handle` VALUES ('1529402613204172906', '35', 'topic', 'topic', 2, 3, 1, '{\"required\":\"1\",\"defaultValue\":\"shenyu-access-logging\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172907', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{\"required":"1",\"defaultValue\":\"pulsar://localhost:6650\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172908', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{\"required":"0",\"defaultValue\":\"1\",\"placeholder\":\"optional,0,0.01~1\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172909', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172910', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{\"required\":\"0\",\"defaultValue\":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO `plugin_handle` VALUES ('1529402613204172911', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{\"required\":\"0\",\"defaultValue\":\"none\"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
diff --git a/db/upgrade/2.5.0-upgrade-2.5.1-pg.sql b/db/upgrade/2.5.0-upgrade-2.5.1-pg.sql
new file mode 100644
index 000000000..6f459fe9c
--- /dev/null
+++ b/db/upgrade/2.5.0-upgrade-2.5.1-pg.sql
@@ -0,0 +1,28 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- this file works for PostgreSQL, can not use "`" syntax.
+
+/* insert plugin for loggingPulsar */
+INSERT INTO "public"."plugin" VALUES ('35', 'loggingPulsar', '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', 'Logging', 185, 0, '2022-05-25 18:08:01', '2022-05-25 18:08:01');
+
+/* insert plugin_handle data for plugin loggingPulsar */
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524972', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524973', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524974', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524975', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524976', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
+INSERT INTO "public"."plugin_handle" VALUES ('1529403902783524977', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}', '2022-06-30 21:00:00', '2022-06-30 21:00:00');
diff --git a/pom.xml b/pom.xml
index a3f57ebfd..b75250b57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,6 +125,7 @@
<spring-security.version>5.3.10.RELEASE</spring-security.version>
<grpc.version>1.33.1</grpc.version>
<rocketmq-client.version>4.9.3</rocketmq-client.version>
+ <pulsar-client.version>2.10.1</pulsar-client.version>
<lz4-java.version>1.8.0</lz4-java.version>
<elasticsearch-java.version>8.2.3</elasticsearch-java.version>
<jackson-databind.version>2.12.3</jackson-databind.version>
diff --git a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
index efc542ab7..1100f6ca0 100755
--- a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
+++ b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
@@ -389,6 +389,7 @@ INSERT IGNORE INTO `plugin` (`id`, `name`, `config`, `role`, `sort`, `enabled`)
INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('32', 'loggingElasticSearch', 'Logging', 190,'{"host":"localhost", "port": "9200"}', '0');
INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('33', 'loggingKafka', 'Logging', 180,'{"topic":"shenyu-access-logging", "namesrvAddr": "localhost:9092"}', '0');
INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('34', 'loggingAliyunSls', 'Logging', 175, '{"projectName": "shenyu", "logStoreName": "shenyu-logstore", "topic": "shenyu-topic"}', '0');
+INSERT IGNORE INTO `plugin` (`id`, `name`, `role`, `sort`, `config`, `enabled`) VALUES ('35', 'loggingPulsar', 'Logging', 185, '{"topic":"shenyu-access-logging", "serviceUrl": "pulsar://localhost:6650"}', '0');
/*insert plugin_handle data for sentinel*/
@@ -586,7 +587,12 @@ INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,
INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172903', '34', 'maxRequestBody', 'maxRequestBody', 1, 3, 11, '{"required":"0","defaultValue":524288,"placeholder":""}');
INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172904', '34', 'maxResponseBody', 'maxResponseBody', 1, 3, 12, '{"required":"0","defaultValue":524288,"placeholder":""}');
INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172905', '34', 'bufferQueueSize', 'bufferQueueSize', 1, 3, 13, '{"required":"0","defaultValue":50000,"placeholder":""}');
-
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172906', '35', 'topic', 'topic', 2, 3, 1, '{"required":"1","defaultValue":"shenyu-access-logging"}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172907', '35', 'serviceUrl', 'serviceUrl', 2, 3, 2, '{"required":"1","defaultValue":"pulsar://localhost:6650"}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172908', '35', 'sampleRate', 'sampleRate', 2, 3, 4, '{"required":"0","defaultValue":"1","placeholder":"optional,0,0.01~1"}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172909', '35', 'maxResponseBody', 'maxResponseBody', 1, 3, 5, '{"required":"0","defaultValue":524288}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172910', '35', 'maxRequestBody', 'maxRequestBody', 1, 3, 6, '{"required":"0","defaultValue":524288}');
+INSERT IGNORE INTO plugin_handle (`id`, `plugin_id`,`field`,`label`,`data_type`,`type`,`sort`,`ext_obj`) VALUES ('1529402613204172911', '35', 'compressAlg', 'compressAlg', 3, 3, 7, '{"required":"0","defaultValue":"none"}');
/** insert resource for resource */
INSERT IGNORE INTO `resource` (`id`, `parent_id`, `title`, `name`, `url`, `component`, `resource_type`, `sort`, `icon`, `is_leaf`, `is_route`, `perms`, `status`) VALUES('1346775491550474240','','SHENYU.MENU.PLUGIN.LIST','plug','/plug','PluginList','0','0','dashboard','0','0','','1');
diff --git a/shenyu-bootstrap/pom.xml b/shenyu-bootstrap/pom.xml
index 8639552e0..5490ba43e 100644
--- a/shenyu-bootstrap/pom.xml
+++ b/shenyu-bootstrap/pom.xml
@@ -495,6 +495,14 @@
</dependency>
<!-- shenyu logging-aliyunsls plugin end -->
+ <!-- shenyu logging-pulsar plugin start -->
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-spring-boot-starter-plugin-logging-pulsar</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- shenyu logging-pulsar plugin end -->
+
</dependencies>
<profiles>
<profile>
diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
index 6431fafba..da773d0e2 100644
--- a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
+++ b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/PluginEnum.java
@@ -147,6 +147,11 @@ public enum PluginEnum {
*/
LOGGING_KAFKA(180, 0, "loggingKafka"),
+ /**
+ * Logging Pulsar plugin enum.
+ */
+ LOGGING_PULSAR(185, 0, "loggingPulsar"),
+
/**
* Logging ElasticSearch plugin enum.
*/
diff --git a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
index 0cb40678b..7df288262 100644
--- a/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
+++ b/shenyu-dist/shenyu-bootstrap-dist/src/main/release-docs/LICENSE
@@ -507,6 +507,7 @@ The text of each license is the standard Apache 2.0 license.
kafka-clients 3.2.0: https://kafka.apache.org/, Apache 2.0
elasticsearch-java 8.2.3: https://github.com/elastic/elasticsearch-java, Apache 2.0
elasticsearch-rest-client 8.2.3: https://github.com/elastic/elasticsearch, Apache 2.0
+ pulsar-client 2.10.1: https://github.com/apache/pulsar, Apache 2.0
aliyun-log 0.6.57: https://github.com/aliyun/aliyun-log-java-producer, Apache 2.0
aliyun-log-producer 0.3.10: https://github.com/aliyun/aliyun-log-java-producer, Apache 2.0
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-plugin/shenyu-plugin-logging/pom.xml
index 67a1cf229..f9187f919 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-plugin/shenyu-plugin-logging/pom.xml
@@ -33,5 +33,6 @@
<module>shenyu-plugin-logging-kafka</module>
<module>shenyu-plugin-logging-elasticsearch</module>
<module>shenyu-plugin-logging-aliyun-sls</module>
+ <module>shenyu-plugin-logging-pulsar</module>
</modules>
</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
index 1204a6250..14d7fcb04 100644
--- a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-common/src/main/java/org/apache/shenyu/plugin/logging/common/constant/GenericLoggingConstant.java
@@ -111,6 +111,11 @@ public class GenericLoggingConstant {
* The constant NAMESERVER_ADDRESS.
*/
public static final String NAMESERVER_ADDRESS = "namesrvAddr";
+
+ /**
+ * The constant SERVICE_URL.
+ */
+ public static final String SERVICE_URL = "serviceUrl";
/**
* The constant PRODUCER_GROUP.
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/pom.xml b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/pom.xml
new file mode 100644
index 000000000..1b7d5cbda
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin-logging</artifactId>
+ <version>2.5.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>shenyu-plugin-logging-pulsar</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin-logging-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${pulsar-client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>${lz4-java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
new file mode 100644
index 000000000..ce02b3b3a
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPlugin.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar;
+
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.logging.common.AbstractLoggingPlugin;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpRequest;
+import org.apache.shenyu.plugin.logging.common.body.LoggingServerHttpResponse;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+
+/**
+ * Integrated pulsar collect log.
+ */
+public class LoggingPulsarPlugin extends AbstractLoggingPlugin {
+
+ @Override
+ public Mono<Void> doLogExecute(final ServerWebExchange exchange, final ShenyuPluginChain chain,
+ final SelectorData selector, final RuleData rule,
+ final ServerHttpRequest request, final ShenyuRequestLog requestInfo) {
+ LoggingServerHttpRequest loggingServerHttpRequest = new LoggingServerHttpRequest(request, requestInfo);
+ LoggingServerHttpResponse loggingServerHttpResponse = new LoggingServerHttpResponse(exchange.getResponse(),
+ requestInfo, PulsarLogCollector.getInstance());
+ ServerWebExchange webExchange = exchange.mutate().request(loggingServerHttpRequest)
+ .response(loggingServerHttpResponse).build();
+ loggingServerHttpResponse.setExchange(webExchange);
+ return chain.execute(webExchange).doOnError(loggingServerHttpResponse::logError);
+ }
+
+ /**
+ * get plugin order.
+ *
+ * @return order
+ */
+ @Override
+ public int getOrder() {
+ return PluginEnum.LOGGING_PULSAR.getCode();
+ }
+
+ /**
+ * get plugin name.
+ *
+ * @return plugin name
+ */
+ @Override
+ public String named() {
+ return PluginEnum.LOGGING_PULSAR.getName();
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
new file mode 100644
index 000000000..04c1b0a9e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.client;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.shenyu.common.utils.JsonUtils;
+import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * queue-based logging collector.
+ */
+public class PulsarLogCollectClient implements LogConsumeClient {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarLogCollectClient.class);
+
+ private PulsarClient client;
+
+ private Producer<byte[]> producer;
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ /**
+ * init producer.
+ *
+ * @param props pulsar props
+ */
+ public void initProducer(final Properties props) {
+ if (MapUtils.isEmpty(props)) {
+ LOG.error("Pulsar props is empty. Fail to init Pulsar producer.");
+ return;
+ }
+ if (isStarted.get()) {
+ close();
+ }
+ String topic = props.getProperty(GenericLoggingConstant.TOPIC);
+ String serviceUrl = props.getProperty(GenericLoggingConstant.SERVICE_URL);
+ if (StringUtils.isBlank(topic) || StringUtils.isBlank(serviceUrl)) {
+ LOG.error("init PulsarLogCollectClient error, please check topic or serviceUrl.");
+ return;
+ }
+ try {
+ client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+ producer = client.newProducer().topic(topic).create();
+ LOG.info("init PulsarLogCollectClient success.");
+ isStarted.set(true);
+ Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+
+ } catch (PulsarClientException e) {
+ LOG.error("init PulsarLogCollectClient error, ", e);
+ }
+
+ }
+
+ @Override
+ public void consume(final List<ShenyuRequestLog> logs) {
+ if (CollectionUtils.isEmpty(logs) || !isStarted.get()) {
+ return;
+ }
+ logs.forEach(log -> {
+ producer.sendAsync(toBytes(log));
+ });
+ }
+
+ private byte[] toBytes(final ShenyuRequestLog log) {
+ byte[] bytes = JsonUtils.toJson(log).getBytes(StandardCharsets.UTF_8);
+ String compressAlg = StringUtils.defaultIfBlank(PulsarLogCollectConfig.INSTANCE.getPulsarLogConfig().getCompressAlg(), "");
+ if ("LZ4".equalsIgnoreCase(compressAlg.trim())) {
+ LZ4CompressData lz4CompressData = new LZ4CompressData(bytes.length, compressedByte(bytes));
+ return JsonUtils.toJson(lz4CompressData).getBytes(StandardCharsets.UTF_8);
+ } else {
+ return bytes;
+ }
+ }
+
+ private byte[] compressedByte(final byte[] srcByte) {
+ LZ4Factory factory = LZ4Factory.fastestInstance();
+ LZ4Compressor compressor = factory.fastCompressor();
+ return compressor.compress(srcByte);
+ }
+
+ @Override
+ public void close() {
+ if (Objects.nonNull(producer) && isStarted.get()) {
+ try {
+ producer.close();
+ client.close();
+ isStarted.set(false);
+ } catch (PulsarClientException e) {
+ LOG.error("fail to close PulsarLogCollectClient, e", e);
+ }
+ }
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollector.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollector.java
new file mode 100644
index 000000000..2b885c90e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.collector;
+
+import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.collector.AbstractLogCollector;
+import org.apache.shenyu.plugin.logging.common.collector.LogCollector;
+import org.apache.shenyu.plugin.logging.pulsar.handler.LoggingPulsarPluginDataHandler;
+
+public class PulsarLogCollector extends AbstractLogCollector {
+
+ private static final LogCollector INSTANCE = new PulsarLogCollector();
+
+ /**
+ * get LogCollector Instance.
+ *
+ * @return LogCollector instance
+ */
+ public static LogCollector getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ protected LogConsumeClient getLogConsumeClient() {
+ return LoggingPulsarPluginDataHandler.getPulsarLogCollectClient();
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
new file mode 100644
index 000000000..2cb489979
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarLogCollectConfig.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.config;
+
+import org.apache.shenyu.plugin.logging.common.config.GenericGlobalConfig;
+
+import java.util.Optional;
+
+public class PulsarLogCollectConfig {
+
+ public static final PulsarLogCollectConfig INSTANCE = new PulsarLogCollectConfig();
+
+ private PulsarLogConfig pulsarLogConfig;
+
+ /**
+ * get global log config.
+ *
+ * @return global log config
+ */
+ public PulsarLogConfig getPulsarLogConfig() {
+ return Optional.ofNullable(pulsarLogConfig).orElse(new PulsarLogConfig());
+ }
+
+ /**
+ * set global log config.
+ *
+ * @param pulsarLogConfig global log config
+ */
+ public void setPulsarLogConfig(final PulsarLogConfig pulsarLogConfig) {
+ this.pulsarLogConfig = pulsarLogConfig;
+ }
+
+ public static class PulsarLogConfig extends GenericGlobalConfig {
+
+ private String compressAlg;
+
+ private String topic;
+
+ private String serviceUrl;
+
+ /**
+ * whether compress.
+ *
+ * @return compress or not
+ */
+ public String getCompressAlg() {
+ return compressAlg;
+ }
+
+ /**
+ * set compress.
+ *
+ * @param compressAlg compress alg.
+ */
+ public void setCompressAlg(final String compressAlg) {
+ this.compressAlg = compressAlg;
+ }
+
+ /**
+ * get message queue topic.
+ *
+ * @return message queue topic
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * topic,used for message queue.
+ *
+ * @param topic mq topic
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * get pulsar service URL.
+ * @return pulsar service URL
+ */
+ public String getServiceUrl() {
+ return serviceUrl;
+ }
+
+ /**
+ * set pulsar service URL.
+ * @param serviceUrl pulsar service URL
+ */
+ public void setServiceUrl(final String serviceUrl) {
+ this.serviceUrl = serviceUrl;
+ }
+ }
+
+ public static class LogApiConfig {
+
+ /**
+ * 0 means never sample, 1 means always sample. Minimum probability is 0.01, or 1% of logging
+ */
+ private String sampleRate;
+
+ /**
+ * This topic is useful if you use message queuing to collect logs.
+ */
+ private String topic;
+
+ /**
+ * get sample rate.
+ *
+ * @return sample rate
+ */
+ public String getSampleRate() {
+ return sampleRate;
+ }
+
+ /**
+ * set sample rate.
+ *
+ * @param sampleRate sample rate
+ */
+ public void setSampleRate(final String sampleRate) {
+ this.sampleRate = sampleRate;
+ }
+
+ /**
+ * get mq topic.
+ *
+ * @return mq topic
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * set mq topic.
+ *
+ * @param topic mq topic
+ */
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
new file mode 100644
index 000000000..f6a97cd5d
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/main/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.handler;
+
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
+import org.apache.shenyu.plugin.logging.pulsar.collector.PulsarLogCollector;
+import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * The type logging pulsar plugin data handler.
+ */
+public class LoggingPulsarPluginDataHandler implements PluginDataHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingPulsarPluginDataHandler.class);
+
+ private static final PulsarLogCollectClient PULSAR_LOG_COLLECT_CLIENT = new PulsarLogCollectClient();
+
+ private static final String EMPTY_JSON = "{}";
+
+ private static final Map<String, List<String>> SELECT_ID_URI_LIST_MAP = new ConcurrentHashMap<>();
+
+ @Override
+ public void handlerPlugin(final PluginData pluginData) {
+ LOG.info("handler loggingPulsar Plugin data:{}", GsonUtils.getGson().toJson(pluginData));
+ if (pluginData.getEnabled()) {
+ PulsarLogCollectConfig.PulsarLogConfig globalLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(),
+ PulsarLogCollectConfig.PulsarLogConfig.class);
+ PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(globalLogConfig);
+ // start pulsar producer
+ Properties properties = new Properties();
+ properties.setProperty(GenericLoggingConstant.TOPIC, globalLogConfig.getTopic());
+ properties.setProperty(GenericLoggingConstant.SERVICE_URL, globalLogConfig.getServiceUrl());
+ PULSAR_LOG_COLLECT_CLIENT.initProducer(properties);
+ PulsarLogCollector.getInstance().start();
+ } else {
+ try {
+ PulsarLogCollector.getInstance().close();
+ } catch (Exception e) {
+ LOG.error("close log collector error", e);
+ }
+ }
+ }
+
+ @Override
+ public void handlerSelector(final SelectorData selectorData) {
+ PluginDataHandler.super.handlerSelector(selectorData);
+ }
+
+ @Override
+ public void removeSelector(final SelectorData selectorData) {
+ PluginDataHandler.super.removeSelector(selectorData);
+ }
+
+ @Override
+ public String pluginNamed() {
+ return PluginEnum.LOGGING_PULSAR.getName();
+ }
+
+ /**
+ * get pulsar log collect client.
+ * @return pulsar log collect client
+ */
+ public static PulsarLogCollectClient getPulsarLogCollectClient() {
+ return PULSAR_LOG_COLLECT_CLIENT;
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
new file mode 100644
index 000000000..52ae59f98
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/LoggingPulsarPluginTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar;
+
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.dto.RuleData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.plugin.api.RemoteAddressResolver;
+import org.apache.shenyu.plugin.api.ShenyuPluginChain;
+import org.apache.shenyu.plugin.api.context.ShenyuContext;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
+import org.springframework.mock.web.server.MockServerWebExchange;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.net.InetSocketAddress;
+
+@ExtendWith(MockitoExtension.class)
+public class LoggingPulsarPluginTest {
+
+ private LoggingPulsarPlugin loggingPulsarPlugin;
+
+ private ServerWebExchange exchange;
+
+ private RuleData ruleData;
+
+ private ShenyuPluginChain chain;
+
+ private SelectorData selectorData;
+
+ private ServerHttpRequest request;
+
+ private ShenyuRequestLog requestLog;
+
+ @BeforeEach
+ public void setUp() {
+ this.loggingPulsarPlugin = new LoggingPulsarPlugin();
+ this.ruleData = Mockito.mock(RuleData.class);
+ this.chain = Mockito.mock(ShenyuPluginChain.class);
+ this.selectorData = Mockito.mock(SelectorData.class);
+ this.request = Mockito.mock(ServerHttpRequest.class);
+ this.requestLog = new ShenyuRequestLog();
+ MockServerHttpRequest request = MockServerHttpRequest
+ .get("localhost")
+ .remoteAddress(new InetSocketAddress(8090))
+ .header("X-source", "mock test")
+ .queryParam("queryParam", "Hello,World")
+ .build();
+ ConfigurableApplicationContext context = Mockito.mock(ConfigurableApplicationContext.class);
+ SpringBeanUtils.getInstance().setApplicationContext(context);
+ RemoteAddressResolver remoteAddressResolver = new RemoteAddressResolver() {
+ };
+ Mockito.lenient().when(context.getBean(RemoteAddressResolver.class)).thenReturn(remoteAddressResolver);
+ this.exchange = Mockito.spy(MockServerWebExchange.from(request));
+ ShenyuContext shenyuContext = Mockito.mock(ShenyuContext.class);
+ exchange.getAttributes().put(Constants.CONTEXT, shenyuContext);
+ }
+
+ @Test
+ public void testDoExecute() {
+ Mockito.when(chain.execute(ArgumentMatchers.any())).thenReturn(Mono.empty());
+ Mono<Void> result = loggingPulsarPlugin.doLogExecute(exchange, chain, selectorData, ruleData, request, requestLog);
+ StepVerifier.create(result).expectSubscription().verifyComplete();
+ }
+
+ @Test
+ public void testGetOrder() {
+ Assertions.assertEquals(loggingPulsarPlugin.getOrder(), PluginEnum.LOGGING_PULSAR.getCode());
+ }
+
+ @Test
+ public void testNamed() {
+ Assertions.assertEquals(loggingPulsarPlugin.named(), PluginEnum.LOGGING_PULSAR.getName());
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
new file mode 100644
index 000000000..c0c1589d5
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/client/PulsarLogCollectClientTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.client;
+
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.plugin.logging.common.constant.GenericLoggingConstant;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.config.PulsarLogCollectConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class PulsarLogCollectClientTest {
+
+ private final PluginData pluginData = new PluginData();
+
+ private final Properties properties = new Properties();
+
+ private final List<ShenyuRequestLog> logs = new ArrayList<>();
+
+ private final ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+ private PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig;
+
+ private PulsarLogCollectClient pulsarLogCollectClient;
+
+ @BeforeEach
+ public void setup() {
+ this.pulsarLogCollectClient = new PulsarLogCollectClient();
+ pluginData.setEnabled(true);
+ pluginData.setConfig("{\"topic\":\"test\", \"serviceUrl\":\"test\"}");
+ pulsarLogConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), PulsarLogCollectConfig.PulsarLogConfig.class);
+ properties.setProperty(GenericLoggingConstant.TOPIC, pulsarLogConfig.getTopic());
+ properties.setProperty(GenericLoggingConstant.SERVICE_URL, pulsarLogConfig.getServiceUrl());
+ shenyuRequestLog.setClientIp("0.0.0.0");
+ shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+ logs.add(shenyuRequestLog);
+ }
+
+ @Test
+ public void testConsume() {
+ String msg = "";
+ PulsarLogCollectConfig.INSTANCE.setPulsarLogConfig(pulsarLogConfig);
+ pulsarLogCollectClient.initProducer(properties);
+ try {
+ pulsarLogCollectClient.consume(logs);
+ } catch (Exception e) {
+ msg = "false";
+ }
+ Assertions.assertEquals(msg, "");
+ pulsarLogCollectClient.close();
+ }
+}
+
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollectorTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollectorTest.java
new file mode 100644
index 000000000..e59fc72c1
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/collector/PulsarLogCollectorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.collector;
+
+import org.apache.shenyu.plugin.logging.common.client.LogConsumeClient;
+import org.apache.shenyu.plugin.logging.common.collector.AbstractLogCollector;
+import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
+import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+
+public class PulsarLogCollectorTest {
+
+ private final ShenyuRequestLog shenyuRequestLog = new ShenyuRequestLog();
+
+ @BeforeEach
+ public void setup() {
+ shenyuRequestLog.setClientIp("0.0.0.0");
+ shenyuRequestLog.setPath("org/apache/shenyu/plugin/logging");
+ }
+
+ @Test
+ public void testAbstractLogCollector() throws Exception {
+ PulsarLogCollector.getInstance().start();
+ Field field1 = AbstractLogCollector.class.getDeclaredField("started");
+ field1.setAccessible(true);
+ Assertions.assertEquals(field1.get(PulsarLogCollector.getInstance()).toString(), "true");
+ PulsarLogCollector.getInstance().collect(shenyuRequestLog);
+ PulsarLogCollector.getInstance().close();
+ Field field2 = AbstractLogCollector.class.getDeclaredField("started");
+ field2.setAccessible(true);
+ Assertions.assertEquals(field2.get(PulsarLogCollector.getInstance()).toString(), "false");
+ }
+
+ @Test
+ public void testGetLogConsumeClient() {
+ LogConsumeClient logConsumeClient = new PulsarLogCollector().getLogConsumeClient();
+ Assertions.assertEquals(PulsarLogCollectClient.class, logConsumeClient.getClass());
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarCollectConfigTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarCollectConfigTest.java
new file mode 100644
index 000000000..a119c642e
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/config/PulsarCollectConfigTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.config;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PulsarCollectConfigTest {
+
+ @Test
+ public void testSetLogApiConfigTopic() {
+ PulsarLogCollectConfig.LogApiConfig logApiConfig = new PulsarLogCollectConfig.LogApiConfig();
+ logApiConfig.setTopic("test");
+ Assertions.assertEquals(logApiConfig.getTopic(), "test");
+ }
+
+ @Test
+ public void testSetLogApiConfigSampleRate() {
+ PulsarLogCollectConfig.LogApiConfig logApiConfig = new PulsarLogCollectConfig.LogApiConfig();
+ logApiConfig.setSampleRate("1");
+ Assertions.assertEquals(logApiConfig.getSampleRate(), "1");
+ }
+
+ @Test
+ public void testGetGlobalLogConfigSampleRate() {
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogConfig.setSampleRate("1");
+ Assertions.assertEquals(pulsarLogConfig.getSampleRate(), "1");
+ }
+
+ @Test
+ public void testSetGlobalLogConfigTopic() {
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogConfig.setTopic("test");
+ Assertions.assertEquals(pulsarLogConfig.getTopic(), "test");
+ }
+
+ @Test
+ public void testSetGlobalLogConfigMaxResponseBody() {
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogConfig.setMaxResponseBody(5);
+ Assertions.assertEquals(pulsarLogConfig.getMaxResponseBody(), 5);
+ }
+
+ @Test
+ public void testSetGlobalLogConfigMaxRequestBody() {
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogConfig.setMaxRequestBody(5);
+ Assertions.assertEquals(pulsarLogConfig.getMaxRequestBody(), 5);
+ }
+
+ @Test
+ public void testSetGlobalLogConfigServiceUrl() {
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogConfig.setServiceUrl("test");
+ Assertions.assertEquals(pulsarLogConfig.getServiceUrl(), "test");
+ }
+
+ @Test
+ public void testGetGlobalLogConfig() {
+ PulsarLogCollectConfig pulsarLogCollectConfig = new PulsarLogCollectConfig();
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogCollectConfig.setPulsarLogConfig(pulsarLogConfig);
+ Assertions.assertEquals(pulsarLogCollectConfig.getPulsarLogConfig(), pulsarLogConfig);
+ }
+
+ @Test
+ public void testCompressAlg() {
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogConfig.setCompressAlg("LZ4");
+ Assertions.assertEquals(pulsarLogConfig.getCompressAlg(), "LZ4");
+ }
+
+ @Test
+ public void testBufferQueueSize() {
+ PulsarLogCollectConfig.PulsarLogConfig pulsarLogConfig = new PulsarLogCollectConfig.PulsarLogConfig();
+ pulsarLogConfig.setBufferQueueSize(50000);
+ Assertions.assertEquals(pulsarLogConfig.getBufferQueueSize(), 50000);
+ }
+}
diff --git a/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandlerTest.java b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandlerTest.java
new file mode 100644
index 000000000..e43a3c7c6
--- /dev/null
+++ b/shenyu-plugin/shenyu-plugin-logging/shenyu-plugin-logging-pulsar/src/test/java/org/apache/shenyu/plugin/logging/pulsar/handler/LoggingPulsarPluginDataHandlerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.logging.pulsar.handler;
+
+import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.PluginData;
+import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.plugin.logging.pulsar.client.PulsarLogCollectClient;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+
+public class LoggingPulsarPluginDataHandlerTest {
+
+ private final SelectorData selectorData = new SelectorData();
+
+ private final ConditionData conditionData = new ConditionData();
+
+ private final PluginData pluginData = new PluginData();
+
+ private LoggingPulsarPluginDataHandler loggingPulsarPluginDataHandler;
+
+ @BeforeEach
+ public void setup() {
+ this.loggingPulsarPluginDataHandler = new LoggingPulsarPluginDataHandler();
+ selectorData.setId("1");
+ selectorData.setType(1);
+ selectorData.setHandle("{\"topic\":\"test\",\"sampleRate\":\"1\"}");
+ conditionData.setParamName("id");
+ conditionData.setParamType("uri");
+ conditionData.setParamValue("11");
+ conditionData.setOperator("=");
+ List<ConditionData> list = new ArrayList<>();
+ list.add(conditionData);
+ selectorData.setConditionList(list);
+ pluginData.setEnabled(true);
+ pluginData.setConfig("{\"topic\":\"test\", \"serviceUrl\":\"test\"}");
+ }
+
+ @Test
+ public void testHandlerPlugin() throws NoSuchFieldException, IllegalAccessException {
+ loggingPulsarPluginDataHandler.handlerPlugin(pluginData);
+ Field field = loggingPulsarPluginDataHandler.getClass().getDeclaredField("PULSAR_LOG_COLLECT_CLIENT");
+ field.setAccessible(true);
+ Assertions.assertEquals(field.get(loggingPulsarPluginDataHandler).getClass(), PulsarLogCollectClient.class);
+ pluginData.setEnabled(false);
+ loggingPulsarPluginDataHandler.handlerPlugin(pluginData);
+ }
+
+ @Test
+ public void testPluginNamed() {
+ Assertions.assertEquals(loggingPulsarPluginDataHandler.pluginNamed(), "loggingPulsar");
+ }
+
+ @Test
+ public void testGetPulsarLogCollectClient() {
+ Assertions.assertEquals(LoggingPulsarPluginDataHandler.getPulsarLogCollectClient().getClass(), PulsarLogCollectClient.class);
+ }
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
index b52f96ceb..13d84f909 100644
--- a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/pom.xml
@@ -64,5 +64,6 @@
<module>shenyu-spring-boot-starter-plugin-cache</module>
<module>shenyu-spring-boot-starter-plugin-mock</module>
<module>shenyu-spring-boot-starter-plugin-logging-aliyun-sls</module>
+ <module>shenyu-spring-boot-starter-plugin-logging-pulsar</module>
</modules>
</project>
diff --git a/shenyu-plugin/shenyu-plugin-logging/pom.xml b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/pom.xml
similarity index 51%
copy from shenyu-plugin/shenyu-plugin-logging/pom.xml
copy to shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/pom.xml
index 67a1cf229..249c20b17 100644
--- a/shenyu-plugin/shenyu-plugin-logging/pom.xml
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/pom.xml
@@ -16,22 +16,32 @@
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.shenyu</groupId>
- <artifactId>shenyu-plugin</artifactId>
+ <artifactId>shenyu-spring-boot-starter-plugin</artifactId>
<version>2.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>shenyu-plugin-logging</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>shenyu-plugin-logging-common</module>
- <module>shenyu-plugin-logging-console</module>
- <module>shenyu-plugin-logging-rocketmq</module>
- <module>shenyu-plugin-logging-kafka</module>
- <module>shenyu-plugin-logging-elasticsearch</module>
- <module>shenyu-plugin-logging-aliyun-sls</module>
- </modules>
+ <artifactId>shenyu-spring-boot-starter-plugin-logging-pulsar</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-plugin-logging-pulsar</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-test</artifactId>
+ </dependency>
+ </dependencies>
+
</project>
\ No newline at end of file
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/pulsar/LoggingPulsarPluginConfiguration.java b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/pulsar/LoggingPulsarPluginConfiguration.java
new file mode 100644
index 000000000..213a1220d
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/java/org/apache/shenyu/springboot/starter/plugin/logging/pulsar/LoggingPulsarPluginConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.springboot.starter.plugin.logging.pulsar;
+
+import org.apache.shenyu.plugin.api.ShenyuPlugin;
+import org.apache.shenyu.plugin.base.handler.PluginDataHandler;
+import org.apache.shenyu.plugin.logging.pulsar.LoggingPulsarPlugin;
+import org.apache.shenyu.plugin.logging.pulsar.handler.LoggingPulsarPluginDataHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * config logging Pulsar plugin.
+ */
+@Configuration
+@ConditionalOnProperty(value = {"shenyu.plugins.logging-pulsar.enabled"}, havingValue = "true", matchIfMissing = true)
+public class LoggingPulsarPluginConfiguration {
+
+ /**
+ * logging pulsar plugin data handler.
+ *
+ * @return logging pulsar PluginDataHandler
+ */
+ @Bean
+ public PluginDataHandler loggingPulsarPluginDataHandler() {
+ return new LoggingPulsarPluginDataHandler();
+ }
+
+ /**
+ * Logging Pulsar plugin.
+ *
+ * @return LoggingPulsarPlugin
+ */
+ @Bean
+ public ShenyuPlugin loggingPulsarPlugin() {
+ return new LoggingPulsarPlugin();
+ }
+}
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.factories b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..648bbe794
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.apache.shenyu.springboot.starter.plugin.logging.pulsar.LoggingPulsarPluginConfiguration
diff --git a/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.provides b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.provides
new file mode 100644
index 000000000..1aba60f24
--- /dev/null
+++ b/shenyu-spring-boot-starter/shenyu-spring-boot-starter-plugin/shenyu-spring-boot-starter-plugin-logging-pulsar/src/main/resources/META-INF/spring.provides
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+provides: shenyu-spring-boot-starter-plugin-logging-pulsar