You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/21 07:30:00 UTC

[incubator-inlong] branch master updated: [INLONG-3159][Audit] Store support TubeMQ (#3227)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 37391ce  [INLONG-3159][Audit] Store support TubeMQ (#3227)
37391ce is described below

commit 37391ce5fe28548b8d53d56ec0b4095f1dd4ece8
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Mar 21 15:29:56 2022 +0800

    [INLONG-3159][Audit] Store support TubeMQ (#3227)
---
 inlong-audit/audit-store/pom.xml                   |  42 ++++--
 .../{PulsarConfig.java => MessageQueueConfig.java} |  32 +++-
 .../audit/service/AuditMsgConsumerServer.java      | 168 +++------------------
 .../inlong/audit/service/consume/BaseConsume.java  |  86 +++++++++++
 .../PulsarConsume.java}                            | 113 +++++---------
 .../inlong/audit/service/consume/TubeConsume.java  | 142 +++++++++++++++++
 .../PulsarConsumeTest.java}                        |  12 +-
 .../audit/service/consume/TubeConsumeTest.java     |  62 ++++++++
 .../src/test/resources/application-test.properties |  12 +-
 inlong-audit/conf/application.properties           |  19 ++-
 10 files changed, 430 insertions(+), 258 deletions(-)

diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index de901db..57ea3c2 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -17,9 +17,9 @@
     specific language governing permissions and limitations
     under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xmlns="http://maven.apache.org/POM/4.0.0"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.inlong</groupId>
         <artifactId>inlong-audit</artifactId>
@@ -163,6 +163,17 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>tubemq-client</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
     <build>
         <resources>
@@ -240,7 +251,8 @@
                         <archive>
                             <manifest>
                                 <mainClass>${start-class}</mainClass>
-                                <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+                                <addDefaultImplementationEntries>true
+                                </addDefaultImplementationEntries>
                             </manifest>
                         </archive>
                     </configuration>
@@ -252,7 +264,8 @@
                         <archive>
                             <manifest>
                                 <mainClass>${start-class}</mainClass>
-                                <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+                                <addDefaultImplementationEntries>true
+                                </addDefaultImplementationEntries>
                             </manifest>
                         </archive>
                     </configuration>
@@ -281,7 +294,9 @@
                         <verbose>true</verbose>
                         <dateFormat>yyyy-MM-dd'T'HH:mm:ssZ</dateFormat>
                         <generateGitPropertiesFile>true</generateGitPropertiesFile>
-                        <generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename>
+                        <generateGitPropertiesFilename>
+                            ${project.build.outputDirectory}/git.properties
+                        </generateGitPropertiesFilename>
                     </configuration>
                 </plugin>
                 <plugin>
@@ -332,17 +347,22 @@
                             </goals>
                             <configuration>
                                 <transformers>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <transformer
+                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.handlers</resource>
                                     </transformer>
-                                    <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
+                                    <transformer
+                                            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                         <resource>META-INF/spring.factories</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <transformer
+                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <transformer
+                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                    <transformer
+                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
                                 </transformers>
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/PulsarConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
similarity index 68%
rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/PulsarConfig.java
rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
index 31bf285..64c80cd 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/PulsarConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
@@ -25,14 +25,15 @@ import org.springframework.stereotype.Component;
 @Component
 @Getter
 @Setter
-public class PulsarConfig {
-    @Value("${audit.pulsar.server.url}")
+public class MessageQueueConfig {
+
+    @Value("${audit.pulsar.server.url:}")
     private String pulsarServerUrl;
 
-    @Value("${audit.pulsar.topic}")
+    @Value("${audit.pulsar.topic:}")
     private String pulsarTopic;
 
-    @Value("${audit.pulsar.consumer.sub.name}")
+    @Value("${audit.pulsar.consumer.sub.name:}")
     private String pulsarConsumerSubName;
 
     @Value("${audit.pulsar.consumer.enable.retry:false}")
@@ -47,4 +48,27 @@ public class PulsarConfig {
     @Value("${audit.pulsar.client.concurrent.consumer.num:1}")
     private int concurrentConsumerNum = 1;
 
+    @Value("${audit.tube.masterlist:}")
+    private String tubeMasterList;
+
+    @Value("${audit.tube.topic:}")
+    private String tubeTopic;
+
+    @Value("${audit.tube.consumer.group.name:}")
+    private String tubeConsumerGroupName;
+
+    @Value("${audit.tube.consumer.thread.num:4}")
+    private int tubeThreadNum;
+
+    @Value("${audit.config.proxy.type:pulsar}")
+    private String mqType;
+
+    public boolean isPulsar() {
+        return mqType.trim().equalsIgnoreCase("pulsar");
+    }
+
+    public boolean isTube() {
+        return mqType.trim().equalsIgnoreCase("tube");
+    }
+
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index a98054b..bb3098c 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -17,42 +17,24 @@
 
 package org.apache.inlong.audit.service;
 
-import com.google.gson.Gson;
-import org.apache.commons.lang.StringUtils;
-import org.apache.inlong.audit.config.PulsarConfig;
+import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-import org.apache.inlong.audit.db.entities.ESDataPo;
-import org.apache.inlong.audit.protocol.AuditData;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.inlong.audit.service.consume.BaseConsume;
+import org.apache.inlong.audit.service.consume.PulsarConsume;
+import org.apache.inlong.audit.service.consume.TubeConsume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
 @Service
 public class AuditMsgConsumerServer implements InitializingBean {
 
     private static final Logger LOG = LoggerFactory.getLogger(AuditMsgConsumerServer.class);
-    private final Gson gson = new Gson();
-    private final ConcurrentHashMap<String, List<Consumer<byte[]>>> topicConsumerMap = new ConcurrentHashMap<>();
-
     @Autowired
-    private PulsarConfig pulsarConfig;
+    private MessageQueueConfig mqConfig;
     @Autowired
     private AuditDataDao auditDataDao;
     @Autowired
@@ -61,136 +43,24 @@ public class AuditMsgConsumerServer implements InitializingBean {
     private StoreConfig storeConfig;
 
     public void afterPropertiesSet() {
-        PulsarClient pulsarClient = getOrCreatePulsarClient(pulsarConfig.getPulsarServerUrl());
-        if (storeConfig.isElasticsearchStore()) {
-            esService.startTimerRoutine();
-        }
-        updateConcurrentConsumer(pulsarClient);
-    }
-
-    private PulsarClient getOrCreatePulsarClient(String pulsarServerUrl) {
-        LOG.info("start consumer pulsarServerUrl = {}", pulsarServerUrl);
-        PulsarClient pulsarClient = null;
-        try {
-            pulsarClient = PulsarClient.builder().serviceUrl(pulsarServerUrl)
-                    .connectionTimeout(pulsarConfig.getClientOperationTimeoutSecond(),
-                            TimeUnit.SECONDS).build();
-        } catch (PulsarClientException e) {
-            LOG.error("getOrCreatePulsarClient has pulsar {} err {}", pulsarServerUrl, e);
-        }
-        return pulsarClient;
-    }
-
-    protected void updateConcurrentConsumer(PulsarClient pulsarClient) {
-        List<Consumer<byte[]>> list =
-                topicConsumerMap.computeIfAbsent(pulsarConfig.getPulsarTopic(),
-                        key -> new ArrayList<Consumer<byte[]>>());
-        int currentConsumerNum = list.size();
-        int createNum = pulsarConfig.getConcurrentConsumerNum() - currentConsumerNum;
-        /*
-         * add consumer
-         */
-        if (createNum > 0) {
-            for (int i = 0; i < pulsarConfig.getConcurrentConsumerNum(); i++) {
-                Consumer<byte[]> consumer = createConsumer(pulsarClient, pulsarConfig.getPulsarTopic());
-                if (consumer != null) {
-                    list.add(consumer);
-                }
-            }
-        } else if (createNum < 0) {
-            /*
-             * delete consumer
-             */
-            int removeIndex = currentConsumerNum - 1;
-            for (int i = createNum; i < 0; i++) {
-                Consumer<byte[]> consumer = list.remove(removeIndex);
-                consumer.closeAsync();
-                removeIndex -= 1;
-            }
+        BaseConsume mqConsume;
+        if (mqConfig.isPulsar()) {
+            mqConsume = new PulsarConsume(auditDataDao, esService, storeConfig, mqConfig);
+        } else if (mqConfig.isTube()) {
+            mqConsume = new TubeConsume(auditDataDao, esService, storeConfig, mqConfig);
+        } else {
+            LOG.error("unkown MessageQueue {}", mqConfig.getMqType());
+            return;
         }
-    }
 
-    protected Consumer<byte[]> createConsumer(PulsarClient pulsarClient, String topic) {
-        Consumer<byte[]> consumer = null;
-        if (pulsarClient != null && StringUtils.isNotEmpty(topic)) {
-            LOG.info("createConsumer has topic {}, subName {}", topic,
-                    pulsarConfig.getPulsarConsumerSubName());
-            try {
-                consumer = pulsarClient.newConsumer()
-                        .subscriptionName(pulsarConfig.getPulsarConsumerSubName())
-                        .subscriptionType(SubscriptionType.Shared)
-                        .topic(topic)
-                        .receiverQueueSize(pulsarConfig.getConsumerReceiveQueueSize())
-                        .enableRetry(pulsarConfig.isPulsarConsumerEnableRetry())
-                        .messageListener(new MessageListener<byte[]>() {
-                            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
-                                try {
-                                    handleMessage(msg);
-                                    consumer.acknowledge(msg);
-                                } catch (Exception e) {
-                                    LOG.error("Consumer has exception topic {}, subName {}, ex {}",
-                                            topic,
-                                            pulsarConfig.getPulsarConsumerSubName(),
-                                            e);
-                                    if (pulsarConfig.isPulsarConsumerEnableRetry()) {
-                                        try {
-                                            consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
-                                        } catch (PulsarClientException pulsarClientException) {
-                                            LOG.error("Consumer reconsumeLater has exception "
-                                                            + "topic {}, subName {}, ex {}",
-                                                    topic,
-                                                    pulsarConfig.getPulsarConsumerSubName(),
-                                                    pulsarClientException);
-                                        }
-                                    } else {
-                                        consumer.negativeAcknowledge(msg);
-                                    }
-                                }
-                            }
-                        })
-                        .subscribe();
-            } catch (PulsarClientException e) {
-                LOG.error("createConsumer has topic {}, subName {}, err {}", topic,
-                        pulsarConfig.getPulsarConsumerSubName(), e);
-            }
+        if (storeConfig.isElasticsearchStore()) {
+            esService.startTimerRoutine();
         }
-        return consumer;
-    }
 
-    protected void handleMessage(Message<byte[]> msg) throws Exception {
-        String body = new String(msg.getData(), StandardCharsets.UTF_8);
-        AuditData msgBody = gson.fromJson(body, AuditData.class);
-        if (storeConfig.isMysqlStore()) {
-            AuditDataPo po = new AuditDataPo();
-            po.setIp(msgBody.getIp());
-            po.setThreadId(msgBody.getThreadId());
-            po.setDockerId(msgBody.getDockerId());
-            po.setPacketId(msgBody.getPacketId());
-            po.setSdkTs(new Date(msgBody.getSdkTs()));
-            po.setLogTs(new Date(msgBody.getLogTs()));
-            po.setAuditId(msgBody.getAuditId());
-            po.setCount(msgBody.getCount());
-            po.setDelay(msgBody.getDelay());
-            po.setInlongGroupId(msgBody.getInlongGroupId());
-            po.setInlongStreamId(msgBody.getInlongStreamId());
-            po.setSize(msgBody.getSize());
-            auditDataDao.insert(po);
-        }
-        if (storeConfig.isElasticsearchStore()) {
-            ESDataPo esPo = new ESDataPo();
-            esPo.setIp(msgBody.getIp());
-            esPo.setThreadId(msgBody.getThreadId());
-            esPo.setDockerId(msgBody.getDockerId());
-            esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
-            esPo.setLogTs(new Date(msgBody.getLogTs()));
-            esPo.setAuditId(msgBody.getAuditId());
-            esPo.setCount(msgBody.getCount());
-            esPo.setDelay(msgBody.getDelay());
-            esPo.setInlongGroupId(msgBody.getInlongGroupId());
-            esPo.setInlongStreamId(msgBody.getInlongStreamId());
-            esPo.setSize(msgBody.getSize());
-            esPo.setPacketId(msgBody.getPacketId());
-            esService.insertData(esPo);
+        if (mqConsume != null) {
+            mqConsume.start();
+        } else {
+            LOG.error("fail to auditMsgConsumerServer");
         }
     }
 
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
new file mode 100644
index 0000000..f73226b
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import com.google.gson.Gson;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.db.entities.AuditDataPo;
+import org.apache.inlong.audit.db.entities.ESDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.service.ElasticsearchService;
+
+import java.util.Date;
+
+public abstract class BaseConsume {
+
+    private final Gson gson = new Gson();
+
+    protected AuditDataDao auditDataDao;
+    protected ElasticsearchService esService;
+    protected StoreConfig storeConfig;
+    protected MessageQueueConfig mqConfig;
+
+    public BaseConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+            MessageQueueConfig mqConfig) {
+        this.auditDataDao = auditDataDao;
+        this.esService = esService;
+        this.storeConfig = storeConfig;
+        this.mqConfig = mqConfig;
+    }
+
+    public abstract void start();
+
+    protected void handleMessage(String body) throws Exception {
+        AuditData msgBody = gson.fromJson(body, AuditData.class);
+        if (storeConfig.isMysqlStore()) {
+            AuditDataPo po = new AuditDataPo();
+            po.setIp(msgBody.getIp());
+            po.setThreadId(msgBody.getThreadId());
+            po.setDockerId(msgBody.getDockerId());
+            po.setPacketId(msgBody.getPacketId());
+            po.setSdkTs(new Date(msgBody.getSdkTs()));
+            po.setLogTs(new Date(msgBody.getLogTs()));
+            po.setAuditId(msgBody.getAuditId());
+            po.setCount(msgBody.getCount());
+            po.setDelay(msgBody.getDelay());
+            po.setInlongGroupId(msgBody.getInlongGroupId());
+            po.setInlongStreamId(msgBody.getInlongStreamId());
+            po.setSize(msgBody.getSize());
+            auditDataDao.insert(po);
+        }
+        if (storeConfig.isElasticsearchStore()) {
+            ESDataPo esPo = new ESDataPo();
+            esPo.setIp(msgBody.getIp());
+            esPo.setThreadId(msgBody.getThreadId());
+            esPo.setDockerId(msgBody.getDockerId());
+            esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
+            esPo.setLogTs(new Date(msgBody.getLogTs()));
+            esPo.setAuditId(msgBody.getAuditId());
+            esPo.setCount(msgBody.getCount());
+            esPo.setDelay(msgBody.getDelay());
+            esPo.setInlongGroupId(msgBody.getInlongGroupId());
+            esPo.setInlongStreamId(msgBody.getInlongStreamId());
+            esPo.setSize(msgBody.getSize());
+            esPo.setPacketId(msgBody.getPacketId());
+            esService.insertData(esPo);
+        }
+    }
+
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
similarity index 57%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
index a98054b..90e91ec 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
@@ -15,16 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.service.consume;
 
-import com.google.gson.Gson;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
-import org.apache.inlong.audit.config.PulsarConfig;
+import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-import org.apache.inlong.audit.db.entities.ESDataPo;
-import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.service.ElasticsearchService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
@@ -33,38 +31,32 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-@Service
-public class AuditMsgConsumerServer implements InitializingBean {
+public class PulsarConsume extends BaseConsume {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AuditMsgConsumerServer.class);
-    private final Gson gson = new Gson();
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarConsume.class);
     private final ConcurrentHashMap<String, List<Consumer<byte[]>>> topicConsumerMap = new ConcurrentHashMap<>();
 
-    @Autowired
-    private PulsarConfig pulsarConfig;
-    @Autowired
-    private AuditDataDao auditDataDao;
-    @Autowired
-    private ElasticsearchService esService;
-    @Autowired
-    private StoreConfig storeConfig;
+    public PulsarConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+            MessageQueueConfig mqConfig) {
+        super(auditDataDao, esService, storeConfig, mqConfig);
+    }
 
-    public void afterPropertiesSet() {
-        PulsarClient pulsarClient = getOrCreatePulsarClient(pulsarConfig.getPulsarServerUrl());
-        if (storeConfig.isElasticsearchStore()) {
-            esService.startTimerRoutine();
-        }
+    @Override
+    public void start() {
+        String pulsarUrl = mqConfig.getPulsarServerUrl();
+        Preconditions.checkArgument(StringUtils.isNotEmpty(pulsarUrl), "no pulsar server url specified");
+        Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getPulsarTopic()),
+                "no pulsar topic specified");
+        Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getPulsarConsumerSubName()),
+                "no pulsar consumeSubName specified");
+        PulsarClient pulsarClient = getOrCreatePulsarClient(pulsarUrl);
         updateConcurrentConsumer(pulsarClient);
     }
 
@@ -73,7 +65,7 @@ public class AuditMsgConsumerServer implements InitializingBean {
         PulsarClient pulsarClient = null;
         try {
             pulsarClient = PulsarClient.builder().serviceUrl(pulsarServerUrl)
-                    .connectionTimeout(pulsarConfig.getClientOperationTimeoutSecond(),
+                    .connectionTimeout(mqConfig.getClientOperationTimeoutSecond(),
                             TimeUnit.SECONDS).build();
         } catch (PulsarClientException e) {
             LOG.error("getOrCreatePulsarClient has pulsar {} err {}", pulsarServerUrl, e);
@@ -83,16 +75,16 @@ public class AuditMsgConsumerServer implements InitializingBean {
 
     protected void updateConcurrentConsumer(PulsarClient pulsarClient) {
         List<Consumer<byte[]>> list =
-                topicConsumerMap.computeIfAbsent(pulsarConfig.getPulsarTopic(),
+                topicConsumerMap.computeIfAbsent(mqConfig.getPulsarTopic(),
                         key -> new ArrayList<Consumer<byte[]>>());
         int currentConsumerNum = list.size();
-        int createNum = pulsarConfig.getConcurrentConsumerNum() - currentConsumerNum;
+        int createNum = mqConfig.getConcurrentConsumerNum() - currentConsumerNum;
         /*
          * add consumer
          */
         if (createNum > 0) {
-            for (int i = 0; i < pulsarConfig.getConcurrentConsumerNum(); i++) {
-                Consumer<byte[]> consumer = createConsumer(pulsarClient, pulsarConfig.getPulsarTopic());
+            for (int i = 0; i < mqConfig.getConcurrentConsumerNum(); i++) {
+                Consumer<byte[]> consumer = createConsumer(pulsarClient, mqConfig.getPulsarTopic());
                 if (consumer != null) {
                     list.add(consumer);
                 }
@@ -114,32 +106,33 @@ public class AuditMsgConsumerServer implements InitializingBean {
         Consumer<byte[]> consumer = null;
         if (pulsarClient != null && StringUtils.isNotEmpty(topic)) {
             LOG.info("createConsumer has topic {}, subName {}", topic,
-                    pulsarConfig.getPulsarConsumerSubName());
+                    mqConfig.getPulsarConsumerSubName());
             try {
                 consumer = pulsarClient.newConsumer()
-                        .subscriptionName(pulsarConfig.getPulsarConsumerSubName())
+                        .subscriptionName(mqConfig.getPulsarConsumerSubName())
                         .subscriptionType(SubscriptionType.Shared)
                         .topic(topic)
-                        .receiverQueueSize(pulsarConfig.getConsumerReceiveQueueSize())
-                        .enableRetry(pulsarConfig.isPulsarConsumerEnableRetry())
+                        .receiverQueueSize(mqConfig.getConsumerReceiveQueueSize())
+                        .enableRetry(mqConfig.isPulsarConsumerEnableRetry())
                         .messageListener(new MessageListener<byte[]>() {
                             public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                                 try {
-                                    handleMessage(msg);
+                                    String body = new String(msg.getData(), StandardCharsets.UTF_8);
+                                    handleMessage(body);
                                     consumer.acknowledge(msg);
                                 } catch (Exception e) {
                                     LOG.error("Consumer has exception topic {}, subName {}, ex {}",
                                             topic,
-                                            pulsarConfig.getPulsarConsumerSubName(),
+                                            mqConfig.getPulsarConsumerSubName(),
                                             e);
-                                    if (pulsarConfig.isPulsarConsumerEnableRetry()) {
+                                    if (mqConfig.isPulsarConsumerEnableRetry()) {
                                         try {
                                             consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
                                         } catch (PulsarClientException pulsarClientException) {
                                             LOG.error("Consumer reconsumeLater has exception "
                                                             + "topic {}, subName {}, ex {}",
                                                     topic,
-                                                    pulsarConfig.getPulsarConsumerSubName(),
+                                                    mqConfig.getPulsarConsumerSubName(),
                                                     pulsarClientException);
                                         }
                                     } else {
@@ -151,47 +144,9 @@ public class AuditMsgConsumerServer implements InitializingBean {
                         .subscribe();
             } catch (PulsarClientException e) {
                 LOG.error("createConsumer has topic {}, subName {}, err {}", topic,
-                        pulsarConfig.getPulsarConsumerSubName(), e);
+                        mqConfig.getPulsarConsumerSubName(), e);
             }
         }
         return consumer;
     }
-
-    protected void handleMessage(Message<byte[]> msg) throws Exception {
-        String body = new String(msg.getData(), StandardCharsets.UTF_8);
-        AuditData msgBody = gson.fromJson(body, AuditData.class);
-        if (storeConfig.isMysqlStore()) {
-            AuditDataPo po = new AuditDataPo();
-            po.setIp(msgBody.getIp());
-            po.setThreadId(msgBody.getThreadId());
-            po.setDockerId(msgBody.getDockerId());
-            po.setPacketId(msgBody.getPacketId());
-            po.setSdkTs(new Date(msgBody.getSdkTs()));
-            po.setLogTs(new Date(msgBody.getLogTs()));
-            po.setAuditId(msgBody.getAuditId());
-            po.setCount(msgBody.getCount());
-            po.setDelay(msgBody.getDelay());
-            po.setInlongGroupId(msgBody.getInlongGroupId());
-            po.setInlongStreamId(msgBody.getInlongStreamId());
-            po.setSize(msgBody.getSize());
-            auditDataDao.insert(po);
-        }
-        if (storeConfig.isElasticsearchStore()) {
-            ESDataPo esPo = new ESDataPo();
-            esPo.setIp(msgBody.getIp());
-            esPo.setThreadId(msgBody.getThreadId());
-            esPo.setDockerId(msgBody.getDockerId());
-            esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
-            esPo.setLogTs(new Date(msgBody.getLogTs()));
-            esPo.setAuditId(msgBody.getAuditId());
-            esPo.setCount(msgBody.getCount());
-            esPo.setDelay(msgBody.getDelay());
-            esPo.setInlongGroupId(msgBody.getInlongGroupId());
-            esPo.setInlongStreamId(msgBody.getInlongStreamId());
-            esPo.setSize(msgBody.getSize());
-            esPo.setPacketId(msgBody.getPacketId());
-            esService.insertData(esPo);
-        }
-    }
-
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
new file mode 100644
index 0000000..42c0c06
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class TubeConsume extends BaseConsume {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TubeConsume.class);
+    private PullMessageConsumer pullConsumer;
+    private TubeMultiSessionFactory sessionFactory;
+    private String masterUrl;
+    private String topic;
+    private int fetchThreadCnt = 4;
+
+    public TubeConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+            MessageQueueConfig mqConfig) {
+        super(auditDataDao, esService, storeConfig, mqConfig);
+    }
+
+    @Override
+    public void start() {
+        masterUrl = mqConfig.getTubeMasterList();
+        Preconditions.checkArgument(StringUtils.isNotEmpty(masterUrl), "no tube masterUrlList specified");
+        topic = mqConfig.getTubeTopic();
+        Preconditions.checkArgument(StringUtils.isNotEmpty(topic), "no tube topic specified");
+        fetchThreadCnt = mqConfig.getTubeThreadNum();
+        Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getTubeConsumerGroupName()),
+                "no tube consumer groupName specified");
+
+        initConsumer();
+
+        Thread[] fetchRunners = new Thread[fetchThreadCnt];
+        for (int i = 0; i < fetchThreadCnt; i++) {
+            fetchRunners[i] = new Thread(new Fetcher(pullConsumer, topic), "TubeConsume_Fetcher_Thread_" + i);
+            fetchRunners[i].start();
+        }
+    }
+
+    private void initConsumer() {
+        LOG.info("init tube consumer, topic:{}, masterList:{}", topic, masterUrl);
+        ConsumerConfig consumerConfig = new ConsumerConfig(masterUrl, mqConfig.getTubeConsumerGroupName());
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        try {
+            sessionFactory = new TubeMultiSessionFactory(consumerConfig);
+            pullConsumer = sessionFactory.createPullConsumer(consumerConfig);
+            pullConsumer.subscribe(topic, null);
+            pullConsumer.completeSubscribe();
+        } catch (TubeClientException e) {
+            LOG.error("init tube consumer error {}", e.getMessage());
+        }
+
+    }
+
+    public class Fetcher implements Runnable {
+
+        private final PullMessageConsumer pullMessageConsumer;
+        private String topic;
+
+        public Fetcher(PullMessageConsumer pullMessageConsumer, String topic) {
+            this.pullMessageConsumer = pullMessageConsumer;
+            this.topic = topic;
+        }
+
+        @Override
+        public void run() {
+            ConsumerResult csmResult;
+
+            // wait partition status ready
+            while (true) {
+                if (pullMessageConsumer.isPartitionsReady(5000) || pullMessageConsumer.isShutdown()) {
+                    LOG.warn("tube partition is not ready or consumer is shutdown!");
+                    break;
+                }
+            }
+            // consume messages
+            while (true) {
+                if (pullMessageConsumer.isShutdown()) {
+                    LOG.warn("consumer is shutdown!");
+                    break;
+                }
+
+                try {
+                    csmResult = pullMessageConsumer.getMessage();
+                    if (csmResult.isSuccess()) {
+                        List<Message> messageList = csmResult.getMessageList();
+                        if (CollectionUtils.isNotEmpty(messageList)) {
+                            for (Message message : messageList) {
+                                if (StringUtils.equals(message.getTopic(), topic)) {
+                                    String body = new String(message.getData(), StandardCharsets.UTF_8);
+                                    handleMessage(body);
+                                }
+                            }
+                        }
+                        pullMessageConsumer.confirmConsume(csmResult.getConfirmContext(), true);
+                    } else {
+                        LOG.error("receive messages errorCode is {}, error meddage is {}", csmResult.getErrCode(),
+                                csmResult.getErrMsg());
+                    }
+                } catch (TubeClientException e) {
+                    LOG.error("tube consumer getMessage error {}", e.getMessage());
+                } catch (Exception e) {
+                    LOG.error("handle audit message error {}", e.getMessage());
+                }
+
+            }
+        }
+    }
+}
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/AuditStoreTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
similarity index 87%
rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/AuditStoreTest.java
rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
index e75ae33..33e55f7 100644
--- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/AuditStoreTest.java
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.service.consume;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -31,10 +31,10 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class AuditStoreTest {
+public class PulsarConsumeTest {
 
     private PulsarClientImpl client;
-    private AuditMsgConsumerServer auditMsgConsumerServer;
+    private PulsarConsume pulsarConsume;
 
     static <T> PulsarClientImpl createPulsarClientMock() {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class, Mockito.RETURNS_DEEP_STUBS);
@@ -56,14 +56,14 @@ public class AuditStoreTest {
         clientConf.setStatsIntervalSeconds(0);
 
         Consumer<byte[]> consumer = client.newConsumer().subscribe();
-        auditMsgConsumerServer = mock(AuditMsgConsumerServer.class);
-        when(auditMsgConsumerServer.createConsumer(any(), any())).thenReturn(consumer);
+        pulsarConsume = mock(PulsarConsume.class);
+        when(pulsarConsume.createConsumer(any(), any())).thenReturn(consumer);
     }
 
     @Test
     public void testConsumer() {
         String topic = "non-persistent://public/default/audit-test";
-        auditMsgConsumerServer.createConsumer(client, topic);
+        pulsarConsume.createConsumer(client, topic);
     }
 
 }
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
new file mode 100644
index 0000000..22784cc
--- /dev/null
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TubeConsumeTest {
+
+    private PullMessageConsumer pullMessageConsumer;
+    private AuditDataDao auditDataDao;
+    private ElasticsearchService esService;
+    private StoreConfig storeConfig;
+    private MessageQueueConfig mqConfig;
+    private String topic = "inlong-audit";
+    private ConsumerResult consumerResult;
+
+    @Before
+    public void setUp() throws TubeClientException {
+        pullMessageConsumer = mock(PullMessageConsumer.class);
+        consumerResult = mock(ConsumerResult.class);
+        pullMessageConsumer.subscribe(topic, null);
+        pullMessageConsumer.completeSubscribe();
+        when(pullMessageConsumer.isPartitionsReady(5000)).thenReturn(true);
+        when(pullMessageConsumer.isShutdown()).thenReturn(true);
+        when(pullMessageConsumer.getMessage()).thenReturn(consumerResult);
+        when(consumerResult.isSuccess()).thenReturn(false);
+    }
+
+    @Test
+    public void testConsume() throws InterruptedException {
+        Thread consumeFetch = new Thread(new TubeConsume(auditDataDao, esService, storeConfig, mqConfig).new Fetcher(
+                pullMessageConsumer, topic), "fetch thread");
+        consumeFetch.start();
+        consumeFetch.interrupt();
+    }
+}
diff --git a/inlong-audit/audit-store/src/test/resources/application-test.properties b/inlong-audit/audit-store/src/test/resources/application-test.properties
index aaf5796..8fcde8c 100644
--- a/inlong-audit/audit-store/src/test/resources/application-test.properties
+++ b/inlong-audit/audit-store/src/test/resources/application-test.properties
@@ -16,16 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 # datasource config, set org.postgresql.Driver if using PostgreSQL
 spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
 spring.datasource.druid.driver-class-name=org.h2.Driver
 spring.datasource.schema=classpath:sql/apache_inlong_audit.sql
-
 spring.datasource.druid.url=jdbc:h2:mem:test;MODE=MYSQL;DB_CLOSE_DELAY=-1;IGNORECASE=TRUE;
 spring.datasource.druid.username=root
 spring.datasource.druid.password=""
-
 spring.datasource.druid.filters=stat,log4j,config
 spring.datasource.druid.max-active=100
 spring.datasource.druid.initial-size=1
@@ -49,14 +46,23 @@ mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
 # audit config
 audit.config.file.check.enable=false
 audit.config.manager.server.url=http://127.0.0.1:8000
+
 # store.server: elasticsearch / mysql
 audit.config.store.mode=elasticsearch
 
+# proxy.type: pulsar / tube
+audit.config.proxy.type=pulsar
+
 # pulsar config
 audit.pulsar.server.url=pulsar://127.0.0.1:6650
 audit.pulsar.topic=persistent://public/default/inlong-audit
 audit.pulsar.consumer.sub.name=inlong-audit-subscription
 
+# tube config
+audit.tube.masterlist=127.0.0.1:8080
+audit.tube.topic=inlong-audit
+audit.tube.consumer.group.name=inlong-audit-consumer
+
 # es config
 elasticsearch.host=127.0.0.1
 elasticsearch.port=9200
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index 45830f5..09fe423 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -16,14 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 # datasource config
 # datasource config, set org.postgresql.Driver if using PostgreSQL
-spring.datasource.driver-class-name = com.mysql.jdbc.Driver
-spring.datasource.name = druidDataSource
-spring.datasource.type = com.alibaba.druid.pool.DruidDataSource
-spring.datasource.druid.driver-class-name= com.mysql.cj.jdbc.Driver
-spring.datasource.druid.url= jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
+spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+spring.datasource.name=druidDataSource
+spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
+spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
+spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
 spring.datasource.druid.username=root
 spring.datasource.druid.password=inlong
 spring.datasource.druid.filters=stat,log4j,config
@@ -46,6 +45,9 @@ spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20
 mybatis.mapper-locations=classpath*:mapper/*.xml
 mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
 
+# proxy.type: pulsar / tube
+audit.config.proxy.type=pulsar
+
 # store.server: elasticsearch / mysql
 audit.config.store.mode=mysql
 
@@ -54,6 +56,11 @@ audit.pulsar.server.url=pulsar://127.0.0.1:6650
 audit.pulsar.topic=persistent://public/default/inlong-audit
 audit.pulsar.consumer.sub.name=inlong-audit-subscription
 
+# tube config
+audit.tube.masterlist=127.0.0.1:8000
+audit.tube.topic=inlong-audit
+audit.tube.consumer.group.name=inlong-audit-consumer
+
 # es config
 elasticsearch.host=127.0.0.1
 elasticsearch.port=9200