You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/07 08:55:52 UTC
[inlong] branch master updated: [INLONG-7518][Audit] Store support Kafka (#7463)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 12ee6c42c [INLONG-7518][Audit] Store support Kafka (#7463)
12ee6c42c is described below
commit 12ee6c42c8e7e4869c170ae97656ff79c323e881
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Mar 7 16:55:44 2023 +0800
[INLONG-7518][Audit] Store support Kafka (#7463)
---
inlong-audit/audit-store/pom.xml | 4 +
.../inlong/audit/config/MessageQueueConfig.java | 28 +++++
.../audit/service/AuditMsgConsumerServer.java | 5 +-
.../inlong/audit/service/consume/KafkaConsume.java | 129 +++++++++++++++++++++
.../audit/service/consume/KafkaConsumeTest.java | 84 ++++++++++++++
.../src/test/resources/application-test.properties | 8 +-
inlong-audit/conf/application.properties | 8 +-
7 files changed, 263 insertions(+), 3 deletions(-)
diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 75b063103..0141bd47c 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -129,6 +129,10 @@
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
index a95e9066c..05036ff55 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
@@ -66,6 +66,30 @@ public class MessageQueueConfig {
@Value("${audit.tube.consumer.thread.num:4}")
private int tubeThreadNum;
+ @Value("${audit.kafka.server.url:}")
+ private String kafkaServerUrl;
+
+ @Value("${audit.kafka.topic:}")
+ private String kafkaTopic;
+
+ @Value("${audit.kafka.consumer.name:}")
+ private String kafkaConsumerName;
+
+ @Value("${audit.kafka.group.id:audit-consumer-group}")
+ private String kafkaGroupId;
+
+ @Value("${audit.kafka.enable.auto.commit:true}")
+ private String enableAutoCommit;
+
+ @Value("${audit.kafka.auto.commit.interval.ms:1000}")
+ private String autoCommitIntervalMs;
+
+ @Value("${audit.kafka.fetch.wait.ms:100}")
+ private long fetchWaitMs = 100;
+
+ @Value("${audit.kafka.auto.offset.reset:earliest}")
+ private String autoOffsetReset;
+
@Value("${audit.config.proxy.type:pulsar}")
private String mqType;
@@ -77,4 +101,8 @@ public class MessageQueueConfig {
return mqType.trim().equalsIgnoreCase("tube");
}
+ public boolean isKafka() {
+ return mqType.trim().equalsIgnoreCase("kafka");
+ }
+
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 502eab325..5fbe0f6ad 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -22,6 +22,7 @@ import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.db.dao.AuditDataDao;
import org.apache.inlong.audit.service.consume.BaseConsume;
+import org.apache.inlong.audit.service.consume.KafkaConsume;
import org.apache.inlong.audit.service.consume.PulsarConsume;
import org.apache.inlong.audit.service.consume.TubeConsume;
import org.slf4j.Logger;
@@ -60,8 +61,10 @@ public class AuditMsgConsumerServer implements InitializingBean {
mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
} else if (mqConfig.isTube()) {
mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
+ } else if (mqConfig.isKafka()) {
+ mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
} else {
- LOG.error("unkown MessageQueue {}", mqConfig.getMqType());
+ LOG.error("unknown MessageQueue {}", mqConfig.getMqType());
return;
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
new file mode 100644
index 000000000..b041ed617
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.service.InsertData;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaConsume extends BaseConsume {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsume.class);
+ private KafkaConsumer<String, byte[]> consumer;
+ private String serverUrl;
+ private String topic;
+
+ /**
+ * Constructor
+ *
+ * @param insertServiceList
+ * @param storeConfig
+ * @param mqConfig
+ */
+ public KafkaConsume(List<InsertData> insertServiceList, StoreConfig storeConfig, MessageQueueConfig mqConfig) {
+ super(insertServiceList, storeConfig, mqConfig);
+ }
+
+ @Override
+ public void start() {
+ serverUrl = mqConfig.getKafkaServerUrl();
+ topic = mqConfig.getKafkaTopic();
+ boolean isAutoCommit = Boolean.getBoolean(mqConfig.getEnableAutoCommit());
+ Preconditions.checkArgument(StringUtils.isNotEmpty(serverUrl), "no kafka server url specified");
+ Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getKafkaTopic()),
+ "no kafka topic topic specified");
+ Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getKafkaConsumerName()),
+ "no kafka consume name specified");
+
+ initConsumer(mqConfig);
+
+ Thread thread = new Thread(new Fetcher(consumer, topic, isAutoCommit, mqConfig.getFetchWaitMs()),
+ "KafkaConsume_Fetcher_Thread");
+ thread.start();
+ }
+
+ protected void initConsumer(MessageQueueConfig mqConfig) {
+ LOG.info("init kafka consumer, topic:{}, serverUrl:{}", topic, serverUrl);
+
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, mqConfig.getKafkaGroupId());
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, mqConfig.getEnableAutoCommit());
+ properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, mqConfig.getAutoCommitIntervalMs());
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, mqConfig.getAutoOffsetReset());
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(Collections.singleton(topic));
+ }
+
+ public class Fetcher implements Runnable {
+
+ private final KafkaConsumer<String, byte[]> consumer;
+ private final String topic;
+ private final boolean isAutoCommit;
+ private final long fetchWaitMs;
+
+ public Fetcher(KafkaConsumer<String, byte[]> consumer, String topic, boolean isAutoCommit, long fetchWaitMs) {
+ this.consumer = consumer;
+ this.topic = topic;
+ this.isAutoCommit = isAutoCommit;
+ this.fetchWaitMs = fetchWaitMs;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ // Set the waiting time of the consumer to 100ms
+ ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(fetchWaitMs));
+ if (records != null && !records.isEmpty()) {
+ for (ConsumerRecord<String, byte[]> record : records) {
+ if (StringUtils.equals(record.topic(), topic)) {
+ String body = new String(record.value(), StandardCharsets.UTF_8);
+ handleMessage(body);
+ }
+ }
+
+ if (!isAutoCommit) {
+ consumer.commitAsync();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("kafka consumer get message error {}", e.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
new file mode 100644
index 000000000..bb2749206
--- /dev/null
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ClickHouseService;
+import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.service.MySqlService;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class KafkaConsumeTest {
+
+ private KafkaConsumer consumer;
+ private AuditDataDao auditDataDao;
+ private ElasticsearchService esService;
+ private ClickHouseConfig ckConfig;
+ private StoreConfig storeConfig;
+ private MessageQueueConfig mqConfig;
+ private String topic = "inlong-audit";
+ private ConsumerRecords records;
+
+ @Before
+ public void setUp() {
+ consumer = mock(KafkaConsumer.class);
+ consumer.subscribe(Collections.singleton(topic));
+ records = mock(ConsumerRecords.class);
+ when(consumer.poll(Duration.ofMillis(100))).thenReturn(records);
+ }
+
+ /**
+ * test kafka consumer
+ */
+ @Test
+ public void testConsumer() {
+ List<InsertData> insertServiceList = this.getInsertServiceList();
+ Thread consumeFetch = new Thread(new KafkaConsume(insertServiceList, storeConfig, mqConfig).new Fetcher(
+ consumer, topic, true, 100), "Fetch_Thread");
+ consumeFetch.start();
+ consumeFetch.interrupt();
+ }
+
+ /**
+ * getInsertServiceList
+ *
+ * @return InsertDataList
+ */
+ private List<InsertData> getInsertServiceList() {
+ List<InsertData> insertData = new ArrayList<>();
+ insertData.add(new MySqlService(auditDataDao));
+ insertData.add(esService);
+ insertData.add(new ClickHouseService(ckConfig));
+ return insertData;
+ }
+}
diff --git a/inlong-audit/audit-store/src/test/resources/application-test.properties b/inlong-audit/audit-store/src/test/resources/application-test.properties
index 2184995c3..2848b93c6 100644
--- a/inlong-audit/audit-store/src/test/resources/application-test.properties
+++ b/inlong-audit/audit-store/src/test/resources/application-test.properties
@@ -50,7 +50,7 @@ audit.config.manager.server.url=http://127.0.0.1:8000
# store.server: elasticsearch / mysql
audit.config.store.mode=elasticsearch
-# proxy.type: pulsar / tube
+# proxy.type: pulsar / tube / kafka
audit.config.proxy.type=pulsar
# pulsar config
@@ -63,6 +63,12 @@ audit.tube.masterlist=127.0.0.1:8715
audit.tube.topic=inlong-audit
audit.tube.consumer.group.name=inlong-audit-consumer
+# kafka config
+audit.kafka.server.url=127.0.0.1:9092
+audit.kafka.topic=inlong-audit
+audit.kafka.consumer.name=inlong-audit-consumer
+audit.kafka.group.id=audit-consumer-group
+
# es config
elasticsearch.host=127.0.0.1
elasticsearch.port=9200
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index 5bfc3b230..b2027f73d 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -42,7 +42,7 @@ spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20
mybatis.mapper-locations=classpath*:mapper/*.xml
mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
-# proxy.type: pulsar / tube
+# proxy.type: pulsar / tube / kafka
audit.config.proxy.type=pulsar
# store.server: mysql / clickhouse / elasticsearch
@@ -60,6 +60,12 @@ audit.tube.masterlist=127.0.0.1:8715
audit.tube.topic=inlong-audit
audit.tube.consumer.group.name=inlong-audit-consumer
+# kafka config
+audit.kafka.server.url=127.0.0.1:9092
+audit.kafka.topic=inlong-audit
+audit.kafka.consumer.name=inlong-audit-consumer
+audit.kafka.group.id=audit-consumer-group
+
# es config
elasticsearch.host=127.0.0.1
elasticsearch.port=9200