You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/07 08:55:52 UTC

[inlong] branch master updated: [INLONG-7518][Audit] Store support Kafka (#7463)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 12ee6c42c [INLONG-7518][Audit] Store support Kafka (#7463)
12ee6c42c is described below

commit 12ee6c42c8e7e4869c170ae97656ff79c323e881
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Mar 7 16:55:44 2023 +0800

    [INLONG-7518][Audit] Store support Kafka (#7463)
---
 inlong-audit/audit-store/pom.xml                   |   4 +
 .../inlong/audit/config/MessageQueueConfig.java    |  28 +++++
 .../audit/service/AuditMsgConsumerServer.java      |   5 +-
 .../inlong/audit/service/consume/KafkaConsume.java | 129 +++++++++++++++++++++
 .../audit/service/consume/KafkaConsumeTest.java    |  84 ++++++++++++++
 .../src/test/resources/application-test.properties |   8 +-
 inlong-audit/conf/application.properties           |   8 +-
 7 files changed, 263 insertions(+), 3 deletions(-)

diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 75b063103..0141bd47c 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -129,6 +129,10 @@
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-high-level-client</artifactId>
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
index a95e9066c..05036ff55 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
@@ -66,6 +66,30 @@ public class MessageQueueConfig {
     @Value("${audit.tube.consumer.thread.num:4}")
     private int tubeThreadNum;
 
+    @Value("${audit.kafka.server.url:}")
+    private String kafkaServerUrl;
+
+    @Value("${audit.kafka.topic:}")
+    private String kafkaTopic;
+
+    @Value("${audit.kafka.consumer.name:}")
+    private String kafkaConsumerName;
+
+    @Value("${audit.kafka.group.id:audit-consumer-group}")
+    private String kafkaGroupId;
+
+    @Value("${audit.kafka.enable.auto.commit:true}")
+    private String enableAutoCommit;
+
+    @Value("${audit.kafka.auto.commit.interval.ms:1000}")
+    private String autoCommitIntervalMs;
+
+    @Value("${audit.kafka.fetch.wait.ms:100}")
+    private long fetchWaitMs = 100;
+
+    @Value("${audit.kafka.auto.offset.reset:earliest}")
+    private String autoOffsetReset;
+
     @Value("${audit.config.proxy.type:pulsar}")
     private String mqType;
 
@@ -77,4 +101,8 @@ public class MessageQueueConfig {
         return mqType.trim().equalsIgnoreCase("tube");
     }
 
+    public boolean isKafka() {
+        return mqType.trim().equalsIgnoreCase("kafka");
+    }
+
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 502eab325..5fbe0f6ad 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -22,6 +22,7 @@ import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.db.dao.AuditDataDao;
 import org.apache.inlong.audit.service.consume.BaseConsume;
+import org.apache.inlong.audit.service.consume.KafkaConsume;
 import org.apache.inlong.audit.service.consume.PulsarConsume;
 import org.apache.inlong.audit.service.consume.TubeConsume;
 import org.slf4j.Logger;
@@ -60,8 +61,10 @@ public class AuditMsgConsumerServer implements InitializingBean {
             mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
         } else if (mqConfig.isTube()) {
             mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
+        } else if (mqConfig.isKafka()) {
+            mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
         } else {
-            LOG.error("unkown MessageQueue {}", mqConfig.getMqType());
+            LOG.error("unknown MessageQueue {}", mqConfig.getMqType());
             return;
         }
 
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
new file mode 100644
index 000000000..b041ed617
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.service.InsertData;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaConsume extends BaseConsume {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsume.class);
+    private KafkaConsumer<String, byte[]> consumer;
+    private String serverUrl;
+    private String topic;
+
+    /**
+     * Constructor
+     *
+     * @param insertServiceList
+     * @param storeConfig
+     * @param mqConfig
+     */
+    public KafkaConsume(List<InsertData> insertServiceList, StoreConfig storeConfig, MessageQueueConfig mqConfig) {
+        super(insertServiceList, storeConfig, mqConfig);
+    }
+
+    @Override
+    public void start() {
+        serverUrl = mqConfig.getKafkaServerUrl();
+        topic = mqConfig.getKafkaTopic();
+        boolean isAutoCommit = Boolean.getBoolean(mqConfig.getEnableAutoCommit());
+        Preconditions.checkArgument(StringUtils.isNotEmpty(serverUrl), "no kafka server url specified");
+        Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getKafkaTopic()),
+                "no kafka topic topic specified");
+        Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getKafkaConsumerName()),
+                "no kafka consume name specified");
+
+        initConsumer(mqConfig);
+
+        Thread thread = new Thread(new Fetcher(consumer, topic, isAutoCommit, mqConfig.getFetchWaitMs()),
+                "KafkaConsume_Fetcher_Thread");
+        thread.start();
+    }
+
+    protected void initConsumer(MessageQueueConfig mqConfig) {
+        LOG.info("init kafka consumer, topic:{}, serverUrl:{}", topic, serverUrl);
+
+        Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, mqConfig.getKafkaGroupId());
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, mqConfig.getEnableAutoCommit());
+        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, mqConfig.getAutoCommitIntervalMs());
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, mqConfig.getAutoOffsetReset());
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        consumer = new KafkaConsumer<>(properties);
+        consumer.subscribe(Collections.singleton(topic));
+    }
+
+    public class Fetcher implements Runnable {
+
+        private final KafkaConsumer<String, byte[]> consumer;
+        private final String topic;
+        private final boolean isAutoCommit;
+        private final long fetchWaitMs;
+
+        public Fetcher(KafkaConsumer<String, byte[]> consumer, String topic, boolean isAutoCommit, long fetchWaitMs) {
+            this.consumer = consumer;
+            this.topic = topic;
+            this.isAutoCommit = isAutoCommit;
+            this.fetchWaitMs = fetchWaitMs;
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    // Set the waiting time of the consumer to 100ms
+                    ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(fetchWaitMs));
+                    if (records != null && !records.isEmpty()) {
+                        for (ConsumerRecord<String, byte[]> record : records) {
+                            if (StringUtils.equals(record.topic(), topic)) {
+                                String body = new String(record.value(), StandardCharsets.UTF_8);
+                                handleMessage(body);
+                            }
+                        }
+
+                        if (!isAutoCommit) {
+                            consumer.commitAsync();
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error("kafka consumer get message error {}", e.getMessage());
+                }
+            }
+        }
+    }
+}
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
new file mode 100644
index 000000000..bb2749206
--- /dev/null
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ClickHouseService;
+import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.service.MySqlService;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class KafkaConsumeTest {
+
+    private KafkaConsumer consumer;
+    private AuditDataDao auditDataDao;
+    private ElasticsearchService esService;
+    private ClickHouseConfig ckConfig;
+    private StoreConfig storeConfig;
+    private MessageQueueConfig mqConfig;
+    private String topic = "inlong-audit";
+    private ConsumerRecords records;
+
+    @Before
+    public void setUp() {
+        consumer = mock(KafkaConsumer.class);
+        consumer.subscribe(Collections.singleton(topic));
+        records = mock(ConsumerRecords.class);
+        when(consumer.poll(Duration.ofMillis(100))).thenReturn(records);
+    }
+
+    /**
+     * test kafka consumer
+     */
+    @Test
+    public void testConsumer() {
+        List<InsertData> insertServiceList = this.getInsertServiceList();
+        Thread consumeFetch = new Thread(new KafkaConsume(insertServiceList, storeConfig, mqConfig).new Fetcher(
+                consumer, topic, true, 100), "Fetch_Thread");
+        consumeFetch.start();
+        consumeFetch.interrupt();
+    }
+
+    /**
+     * getInsertServiceList
+     *
+     * @return InsertDataList
+     */
+    private List<InsertData> getInsertServiceList() {
+        List<InsertData> insertData = new ArrayList<>();
+        insertData.add(new MySqlService(auditDataDao));
+        insertData.add(esService);
+        insertData.add(new ClickHouseService(ckConfig));
+        return insertData;
+    }
+}
diff --git a/inlong-audit/audit-store/src/test/resources/application-test.properties b/inlong-audit/audit-store/src/test/resources/application-test.properties
index 2184995c3..2848b93c6 100644
--- a/inlong-audit/audit-store/src/test/resources/application-test.properties
+++ b/inlong-audit/audit-store/src/test/resources/application-test.properties
@@ -50,7 +50,7 @@ audit.config.manager.server.url=http://127.0.0.1:8000
 # store.server: elasticsearch / mysql
 audit.config.store.mode=elasticsearch
 
-# proxy.type: pulsar / tube
+# proxy.type: pulsar / tube / kafka
 audit.config.proxy.type=pulsar
 
 # pulsar config
@@ -63,6 +63,12 @@ audit.tube.masterlist=127.0.0.1:8715
 audit.tube.topic=inlong-audit
 audit.tube.consumer.group.name=inlong-audit-consumer
 
+# kafka config
+audit.kafka.server.url=127.0.0.1:9092
+audit.kafka.topic=inlong-audit
+audit.kafka.consumer.name=inlong-audit-consumer
+audit.kafka.group.id=audit-consumer-group
+
 # es config
 elasticsearch.host=127.0.0.1
 elasticsearch.port=9200
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index 5bfc3b230..b2027f73d 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -42,7 +42,7 @@ spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20
 mybatis.mapper-locations=classpath*:mapper/*.xml
 mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
 
-# proxy.type: pulsar / tube
+# proxy.type: pulsar / tube / kafka
 audit.config.proxy.type=pulsar
 
 # store.server: mysql / clickhouse / elasticsearch
@@ -60,6 +60,12 @@ audit.tube.masterlist=127.0.0.1:8715
 audit.tube.topic=inlong-audit
 audit.tube.consumer.group.name=inlong-audit-consumer
 
+# kafka config
+audit.kafka.server.url=127.0.0.1:9092
+audit.kafka.topic=inlong-audit
+audit.kafka.consumer.name=inlong-audit-consumer
+audit.kafka.group.id=audit-consumer-group
+
 # es config
 elasticsearch.host=127.0.0.1
 elasticsearch.port=9200