You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/30 02:27:15 UTC

[inlong] branch master updated: [INLONG-4741][Audit] AuditStore support ClickHouse sink (#4747)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 893a8fc9e [INLONG-4741][Audit] AuditStore support ClickHouse sink (#4747)
893a8fc9e is described below

commit 893a8fc9e5c1824cfa1b4b81c0e098b3ebb96cef
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Jun 30 10:27:11 2022 +0800

    [INLONG-4741][Audit] AuditStore support ClickHouse sink (#4747)
---
 inlong-audit/audit-store/pom.xml                   |   4 +
 .../{StoreConfig.java => ClickHouseConfig.java}    |  34 ++--
 .../apache/inlong/audit/config/StoreConfig.java    |   4 +
 .../entities/ClickHouseDataPo.java}                |  33 ++--
 .../audit/service/AuditMsgConsumerServer.java      |  35 +++-
 .../inlong/audit/service/ClickHouseService.java    | 202 +++++++++++++++++++++
 .../inlong/audit/service/ElasticsearchService.java |  31 +++-
 .../StoreConfig.java => service/InsertData.java}   |  38 ++--
 .../apache/inlong/audit/service/MySqlService.java  |  64 +++++++
 .../inlong/audit/service/consume/BaseConsume.java  |  64 +++----
 .../audit/service/consume/PulsarConsume.java       |  22 ++-
 .../inlong/audit/service/consume/TubeConsume.java  |  14 +-
 .../sql/apache_inlong_audit_clickhouse.sql         |  44 +++++
 .../audit/service/consume/TubeConsumeTest.java     |  27 ++-
 inlong-audit/conf/application.properties           |   9 +
 .../sql/apache_inlong_audit_clickhouse.sql         |  44 +++++
 licenses/inlong-audit/LICENSE                      |   1 +
 licenses/inlong-sort-standalone/LICENSE            |   1 +
 18 files changed, 553 insertions(+), 118 deletions(-)

diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 0193c2371..101b6b988 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -156,6 +156,10 @@
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
         </dependency>
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
similarity index 61%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
index 71dbc27c8..df26241e6 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
@@ -17,25 +17,35 @@
 
 package org.apache.inlong.audit.config;
 
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
 import lombok.Getter;
 import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
 
-@Component
+@Configuration
 @Getter
 @Setter
-public class StoreConfig {
+public class ClickHouseConfig {
+
+    @Value("${clickhouse.driver}")
+    private String driver;
+
+    @Value("${clickhouse.url}")
+    private String url;
 
-    @Value("${audit.config.store.mode:mysql}")
-    private String store;
+    @Value("${clickhouse.username}")
+    private String username;
 
-    public boolean isMysqlStore() {
-        return store.contains("mysql");
-    }
+    @Value("${clickhouse.password}")
+    private String password;
 
-    public boolean isElasticsearchStore() {
-        return store.contains("elasticsearch");
-    }
+    @Value("${clickhouse.batchIntervalMs:1000}")
+    private int batchIntervalMs;
+    
+    @Value("${clickhouse.batchThreshold:500}")
+    private int batchThreshold;
 
+    @Value("${clickhouse.processIntervalMs:100}")
+    private int processIntervalMs;
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
index 71dbc27c8..41ab00da8 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
@@ -38,4 +38,8 @@ public class StoreConfig {
         return store.contains("elasticsearch");
     }
 
+    public boolean isClickHouseStore() {
+        return store.contains("clickhouse");
+    }
+
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
similarity index 64%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
index 71dbc27c8..50b0b676d 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
@@ -15,27 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.db.entities;
+
+import java.sql.Timestamp;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
 
-@Component
 @Getter
 @Setter
-public class StoreConfig {
-
-    @Value("${audit.config.store.mode:mysql}")
-    private String store;
-
-    public boolean isMysqlStore() {
-        return store.contains("mysql");
-    }
-
-    public boolean isElasticsearchStore() {
-        return store.contains("elasticsearch");
-    }
+public class ClickHouseDataPo {
 
+    private String ip;
+    private String dockerId;
+    private String threadId;
+    private Timestamp sdkTs;
+    private long packetId;
+    private Timestamp logTs;
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String auditId;
+    private long count;
+    private long size;
+    private long delay;
+    private Timestamp updateTime;
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 319807851..aa0acbf2d 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.audit.service;
 
+import org.apache.inlong.audit.config.ClickHouseConfig;
 import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.db.dao.AuditDataDao;
@@ -29,6 +30,9 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
+import java.util.List;
+
 @Service
 public class AuditMsgConsumerServer implements InitializingBean {
 
@@ -41,16 +45,19 @@ public class AuditMsgConsumerServer implements InitializingBean {
     private ElasticsearchService esService;
     @Autowired
     private StoreConfig storeConfig;
+    @Autowired
+    private ClickHouseConfig chConfig;
 
     /**
      * Initializing bean
      */
     public void afterPropertiesSet() {
-        BaseConsume mqConsume;
+        BaseConsume mqConsume = null;
+        List<InsertData> insertServiceList = this.getInsertServiceList();
         if (mqConfig.isPulsar()) {
-            mqConsume = new PulsarConsume(auditDataDao, esService, storeConfig, mqConfig);
+            mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
         } else if (mqConfig.isTube()) {
-            mqConsume = new TubeConsume(auditDataDao, esService, storeConfig, mqConfig);
+            mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
         } else {
             LOG.error("unkown MessageQueue {}", mqConfig.getMqType());
             return;
@@ -59,12 +66,24 @@ public class AuditMsgConsumerServer implements InitializingBean {
         if (storeConfig.isElasticsearchStore()) {
             esService.startTimerRoutine();
         }
+        mqConsume.start();
+    }
 
-        if (mqConsume != null) {
-            mqConsume.start();
-        } else {
-            LOG.error("fail to auditMsgConsumerServer");
+    /**
+     * getInsertServiceList
+     * @return
+     */
+    private List<InsertData> getInsertServiceList() {
+        List<InsertData> insertServiceList = new ArrayList<>();
+        if (storeConfig.isMysqlStore()) {
+            insertServiceList.add(new MySqlService(auditDataDao));
+        }
+        if (storeConfig.isElasticsearchStore()) {
+            insertServiceList.add(esService);
         }
+        if (storeConfig.isClickHouseStore()) {
+            insertServiceList.add(new ClickHouseService(chConfig));
+        }
+        return insertServiceList;
     }
-
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
new file mode 100644
index 000000000..24042d5aa
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.db.entities.ClickHouseDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * ClickHouseService
+ */
+public class ClickHouseService implements InsertData, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClickHouseService.class);
+    public static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id,\r\n"
+            + "      sdk_ts, packet_id, log_ts,\r\n"
+            + "      inlong_group_id, inlong_stream_id, audit_id,\r\n"
+            + "      count, size, delay, \r\n"
+            + "      update_time)\r\n"
+            + "    values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+    private ClickHouseConfig chConfig;
+
+    private ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+    private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
+    private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
+    private AtomicInteger batchCounter = new AtomicInteger(0);
+
+    private Connection conn;
+
+    /**
+     * Constructor
+     * @param chConfig ClickHouse service config, such as jdbc url, jdbc username, jdbc password.
+     */
+    public ClickHouseService(ClickHouseConfig chConfig) {
+        this.chConfig = chConfig;
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        // queue
+        this.batchQueue = new LinkedBlockingQueue<>(
+                chConfig.getBatchThreshold() * chConfig.getBatchIntervalMs() / chConfig.getProcessIntervalMs());
+        // connection
+        try {
+            Class.forName(chConfig.getDriver());
+            this.reconnect();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+        // timer
+        long currentTime = System.currentTimeMillis();
+        // batch output interval
+        timerService.scheduleWithFixedDelay(() -> needBatchOutput.compareAndSet(false, true),
+                currentTime + chConfig.getBatchIntervalMs(),
+                chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
+        // batch output process
+        timerService.scheduleWithFixedDelay(() -> processOutput(),
+                currentTime + chConfig.getProcessIntervalMs(),
+                chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * processOutput
+     */
+    private void processOutput() {
+        if (!this.needBatchOutput.get()) {
+            return;
+        }
+        // output
+        try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) {
+            // insert data
+            ClickHouseDataPo data = this.batchQueue.poll();
+            int counter = 0;
+            while (data != null) {
+                pstat.setString(1, data.getIp());
+                pstat.setString(2, data.getDockerId());
+                pstat.setString(3, data.getThreadId());
+                pstat.setTimestamp(4, data.getSdkTs());
+                pstat.setLong(5, data.getPacketId());
+                pstat.setTimestamp(6, data.getLogTs());
+                pstat.setString(7, data.getInlongGroupId());
+                pstat.setString(8, data.getInlongStreamId());
+                pstat.setString(9, data.getAuditId());
+                pstat.setLong(10, data.getCount());
+                pstat.setLong(11, data.getSize());
+                pstat.setLong(12, data.getDelay());
+                pstat.setTimestamp(13, data.getUpdateTime());
+                pstat.addBatch();
+                this.batchCounter.decrementAndGet();
+                if (++counter >= chConfig.getBatchThreshold()) {
+                    pstat.executeBatch();
+                    this.conn.commit();
+                    counter = 0;
+                }
+            }
+            this.batchCounter.set(0);
+            pstat.executeBatch();
+            this.conn.commit();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            try {
+                this.reconnect();
+            } catch (SQLException e1) {
+                LOG.error(e1.getMessage(), e1);
+            }
+        }
+
+        // recover
+        this.needBatchOutput.compareAndSet(true, false);
+    }
+
+    /**
+     * reconnect
+     * @throws SQLException Exception when creating connection.
+     */
+    private void reconnect() throws SQLException {
+        if (this.conn != null) {
+            try {
+                this.conn.close();
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+            this.conn = null;
+        }
+        this.conn = DriverManager.getConnection(chConfig.getUrl(), chConfig.getUsername(),
+                chConfig.getPassword());
+        this.conn.setAutoCommit(false);
+    }
+
+    /**
+     * insert
+     * @param msgBody audit data reading from Pulsar or other MessageQueue. 
+     */
+    @Override
+    public void insert(AuditData msgBody) {
+        ClickHouseDataPo data = new ClickHouseDataPo();
+        data.setIp(msgBody.getIp());
+        data.setThreadId(msgBody.getThreadId());
+        data.setDockerId(msgBody.getDockerId());
+        data.setPacketId(msgBody.getPacketId());
+        data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
+        data.setLogTs(new Timestamp(msgBody.getLogTs()));
+        data.setAuditId(msgBody.getAuditId());
+        data.setCount(msgBody.getCount());
+        data.setDelay(msgBody.getDelay());
+        data.setInlongGroupId(msgBody.getInlongGroupId());
+        data.setInlongStreamId(msgBody.getInlongStreamId());
+        data.setSize(msgBody.getSize());
+        data.setUpdateTime(new Timestamp(System.currentTimeMillis()));
+        try {
+            this.batchQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            if (this.batchCounter.incrementAndGet() >= chConfig.getBatchThreshold()) {
+                this.needBatchOutput.compareAndSet(false, true);
+            }
+        } catch (InterruptedException e) {
+            LOG.error(e.getMessage(), e);
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     * @throws Exception Exception when closing ClickHouse connection.
+     */
+    @Override
+    public void close() throws Exception {
+        this.conn.close();
+        this.timerService.shutdown();
+    }
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
index ca573564b..b381a4623 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
@@ -20,8 +20,10 @@ package org.apache.inlong.audit.service;
 import com.google.gson.FieldNamingPolicy;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
 import org.apache.inlong.audit.config.ElasticsearchConfig;
 import org.apache.inlong.audit.db.entities.ESDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -55,7 +57,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 @Service
-public class ElasticsearchService implements AutoCloseable {
+public class ElasticsearchService implements InsertData, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchService.class);
 
@@ -71,6 +73,7 @@ public class ElasticsearchService implements AutoCloseable {
 
     public void startTimerRoutine() {
         timerService.scheduleAtFixedRate((new Runnable() {
+
             @Override
             public void run() {
                 try {
@@ -82,6 +85,7 @@ public class ElasticsearchService implements AutoCloseable {
         }), 1, 1, TimeUnit.DAYS);
 
         timerService.scheduleWithFixedDelay((new Runnable() {
+
             @Override
             public void run() {
                 try {
@@ -312,7 +316,26 @@ public class ElasticsearchService implements AutoCloseable {
         builder.endObject();
         return builder;
     }
-}
-
-
 
+    /**
+     * insert
+     * @param msgBody
+     */
+    @Override
+    public void insert(AuditData msgBody) {
+        ESDataPo esPo = new ESDataPo();
+        esPo.setIp(msgBody.getIp());
+        esPo.setThreadId(msgBody.getThreadId());
+        esPo.setDockerId(msgBody.getDockerId());
+        esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
+        esPo.setLogTs(new Date(msgBody.getLogTs()));
+        esPo.setAuditId(msgBody.getAuditId());
+        esPo.setCount(msgBody.getCount());
+        esPo.setDelay(msgBody.getDelay());
+        esPo.setInlongGroupId(msgBody.getInlongGroupId());
+        esPo.setInlongStreamId(msgBody.getInlongStreamId());
+        esPo.setSize(msgBody.getSize());
+        esPo.setPacketId(msgBody.getPacketId());
+        this.insertData(esPo);
+    }
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
similarity index 51%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
index 71dbc27c8..61cd32826 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
@@ -1,10 +1,10 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -15,27 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.service;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
+import org.apache.inlong.audit.protocol.AuditData;
 
-@Component
-@Getter
-@Setter
-public class StoreConfig {
-
-    @Value("${audit.config.store.mode:mysql}")
-    private String store;
-
-    public boolean isMysqlStore() {
-        return store.contains("mysql");
-    }
-
-    public boolean isElasticsearchStore() {
-        return store.contains("elasticsearch");
-    }
+/**
+ * InsertData
+ * 
+ */
+public interface InsertData {
 
+    /**
+     * insert
+     * @param msgBody
+     */
+    void insert(AuditData msgBody);
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
new file mode 100644
index 000000000..e576b789c
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.db.entities.AuditDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+
+import java.util.Date;
+
+/**
+ * MySqlService
+ * 
+ */
+public class MySqlService implements InsertData {
+
+    private AuditDataDao dao;
+
+    /**
+     * Constructor
+     * @param dao
+     */
+    public MySqlService(AuditDataDao dao) {
+        this.dao = dao;
+    }
+
+    /**
+     * insert
+     * @param msgBody
+     */
+    @Override
+    public void insert(AuditData msgBody) {
+        AuditDataPo po = new AuditDataPo();
+        po.setIp(msgBody.getIp());
+        po.setThreadId(msgBody.getThreadId());
+        po.setDockerId(msgBody.getDockerId());
+        po.setPacketId(msgBody.getPacketId());
+        po.setSdkTs(new Date(msgBody.getSdkTs()));
+        po.setLogTs(new Date(msgBody.getLogTs()));
+        po.setAuditId(msgBody.getAuditId());
+        po.setCount(msgBody.getCount());
+        po.setDelay(msgBody.getDelay());
+        po.setInlongGroupId(msgBody.getInlongGroupId());
+        po.setInlongStreamId(msgBody.getInlongStreamId());
+        po.setSize(msgBody.getSize());
+        dao.insert(po);
+    }
+
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
index f73226b0a..845c55d02 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
@@ -18,69 +18,49 @@
 package org.apache.inlong.audit.service.consume;
 
 import com.google.gson.Gson;
+
 import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-import org.apache.inlong.audit.db.entities.ESDataPo;
 import org.apache.inlong.audit.protocol.AuditData;
-import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Date;
+import java.util.List;
 
 public abstract class BaseConsume {
 
+    private static final Logger LOG = LoggerFactory.getLogger(BaseConsume.class);
+
     private final Gson gson = new Gson();
 
-    protected AuditDataDao auditDataDao;
-    protected ElasticsearchService esService;
+    protected List<InsertData> insertServiceList;
     protected StoreConfig storeConfig;
     protected MessageQueueConfig mqConfig;
 
-    public BaseConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+    public BaseConsume(List<InsertData> insertServiceList, StoreConfig storeConfig,
             MessageQueueConfig mqConfig) {
-        this.auditDataDao = auditDataDao;
-        this.esService = esService;
+        this.insertServiceList = insertServiceList;
         this.storeConfig = storeConfig;
         this.mqConfig = mqConfig;
     }
 
     public abstract void start();
 
+    /**
+     * handleMessage
+     * @param body
+     * @throws Exception
+     */
     protected void handleMessage(String body) throws Exception {
         AuditData msgBody = gson.fromJson(body, AuditData.class);
-        if (storeConfig.isMysqlStore()) {
-            AuditDataPo po = new AuditDataPo();
-            po.setIp(msgBody.getIp());
-            po.setThreadId(msgBody.getThreadId());
-            po.setDockerId(msgBody.getDockerId());
-            po.setPacketId(msgBody.getPacketId());
-            po.setSdkTs(new Date(msgBody.getSdkTs()));
-            po.setLogTs(new Date(msgBody.getLogTs()));
-            po.setAuditId(msgBody.getAuditId());
-            po.setCount(msgBody.getCount());
-            po.setDelay(msgBody.getDelay());
-            po.setInlongGroupId(msgBody.getInlongGroupId());
-            po.setInlongStreamId(msgBody.getInlongStreamId());
-            po.setSize(msgBody.getSize());
-            auditDataDao.insert(po);
-        }
-        if (storeConfig.isElasticsearchStore()) {
-            ESDataPo esPo = new ESDataPo();
-            esPo.setIp(msgBody.getIp());
-            esPo.setThreadId(msgBody.getThreadId());
-            esPo.setDockerId(msgBody.getDockerId());
-            esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
-            esPo.setLogTs(new Date(msgBody.getLogTs()));
-            esPo.setAuditId(msgBody.getAuditId());
-            esPo.setCount(msgBody.getCount());
-            esPo.setDelay(msgBody.getDelay());
-            esPo.setInlongGroupId(msgBody.getInlongGroupId());
-            esPo.setInlongStreamId(msgBody.getInlongStreamId());
-            esPo.setSize(msgBody.getSize());
-            esPo.setPacketId(msgBody.getPacketId());
-            esService.insertData(esPo);
-        }
+        this.insertServiceList.forEach((service) -> {
+            try {
+                service.insert(msgBody);
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+        });
     }
 
 }
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
index 089e3a099..d93726380 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
@@ -18,11 +18,11 @@
 package org.apache.inlong.audit.service.consume;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
@@ -45,9 +45,15 @@ public class PulsarConsume extends BaseConsume {
     private static final Logger LOG = LoggerFactory.getLogger(PulsarConsume.class);
     private final ConcurrentHashMap<String, List<Consumer<byte[]>>> topicConsumerMap = new ConcurrentHashMap<>();
 
-    public PulsarConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+    /**
+     * Constructor
+     * @param insertServiceList
+     * @param storeConfig
+     * @param mqConfig
+     */
+    public PulsarConsume(List<InsertData> insertServiceList, StoreConfig storeConfig,
             MessageQueueConfig mqConfig) {
-        super(auditDataDao, esService, storeConfig, mqConfig);
+        super(insertServiceList, storeConfig, mqConfig);
     }
 
     @Override
@@ -79,9 +85,8 @@ public class PulsarConsume extends BaseConsume {
     }
 
     protected void updateConcurrentConsumer(PulsarClient pulsarClient) {
-        List<Consumer<byte[]>> list =
-                topicConsumerMap.computeIfAbsent(mqConfig.getPulsarTopic(),
-                        key -> new ArrayList<Consumer<byte[]>>());
+        List<Consumer<byte[]>> list = topicConsumerMap.computeIfAbsent(mqConfig.getPulsarTopic(),
+                key -> new ArrayList<Consumer<byte[]>>());
         int currentConsumerNum = list.size();
         int createNum = mqConfig.getConcurrentConsumerNum() - currentConsumerNum;
         /*
@@ -120,6 +125,7 @@ public class PulsarConsume extends BaseConsume {
                         .receiverQueueSize(mqConfig.getConsumerReceiveQueueSize())
                         .enableRetry(mqConfig.isPulsarConsumerEnableRetry())
                         .messageListener(new MessageListener<byte[]>() {
+
                             public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                                 try {
                                     String body = new String(msg.getData(), StandardCharsets.UTF_8);
@@ -135,7 +141,7 @@ public class PulsarConsume extends BaseConsume {
                                             consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
                                         } catch (PulsarClientException pulsarClientException) {
                                             LOG.error("Consumer reconsumeLater has exception "
-                                                            + "topic {}, subName {}, ex {}",
+                                                    + "topic {}, subName {}, ex {}",
                                                     topic,
                                                     mqConfig.getPulsarConsumerSubName(),
                                                     pulsarClientException);
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
index 37d976950..a70979fa6 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.audit.service.consume;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
@@ -46,9 +46,15 @@ public class TubeConsume extends BaseConsume {
     private String topic;
     private int fetchThreadCnt = 4;
 
-    public TubeConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+    /**
+     * Constructor
+     * @param insertServiceList
+     * @param storeConfig
+     * @param mqConfig
+     */
+    public TubeConsume(List<InsertData> insertServiceList, StoreConfig storeConfig,
             MessageQueueConfig mqConfig) {
-        super(auditDataDao, esService, storeConfig, mqConfig);
+        super(insertServiceList, storeConfig, mqConfig);
     }
 
     @Override
diff --git a/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
new file mode 100644
index 000000000..f9dd34d80
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
+SET time_zone = "+00:00";
+
+CREATE DATABASE IF NOT EXISTS apache_inlong_audit;
+USE apache_inlong_audit;
+
+CREATE TABLE `audit_data`
+(
+    `ip`               String COMMENT 'client ip',
+    `docker_id`        String COMMENT 'client docker id',
+    `thread_id`        String COMMENT 'client thread id',
+    `sdk_ts`           DateTime COMMENT 'sdk timestamp',
+    `packet_id`        Int64 COMMENT 'packet id',
+    `log_ts`           DateTime COMMENT 'log timestamp',
+    `inlong_group_id`  String COMMENT 'inlong group id',
+    `inlong_stream_id` String COMMENT 'inlong stream id',
+    `audit_id`         String COMMENT 'audit id',
+    `count`            Int64 COMMENT 'msg count',
+    `size`             Int64 COMMENT 'msg size',
+    `delay`            Int64 COMMENT 'msg delay',
+    `update_time`       DateTime COMMENT 'update time'
+)
+ENGINE = MergeTree
+ORDER BY inlong_group_id
+SETTINGS index_granularity = 8192;
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
index 22784cca4..130156aae 100644
--- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
@@ -17,16 +17,23 @@
 
 package org.apache.inlong.audit.service.consume;
 
+import org.apache.inlong.audit.config.ClickHouseConfig;
 import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
 import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ClickHouseService;
 import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.service.MySqlService;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -35,6 +42,7 @@ public class TubeConsumeTest {
     private PullMessageConsumer pullMessageConsumer;
     private AuditDataDao auditDataDao;
     private ElasticsearchService esService;
+    private ClickHouseConfig chConfig;
     private StoreConfig storeConfig;
     private MessageQueueConfig mqConfig;
     private String topic = "inlong-audit";
@@ -52,11 +60,28 @@ public class TubeConsumeTest {
         when(consumerResult.isSuccess()).thenReturn(false);
     }
 
+    /**
+     * testConsume
+     * @throws InterruptedException
+     */
     @Test
     public void testConsume() throws InterruptedException {
-        Thread consumeFetch = new Thread(new TubeConsume(auditDataDao, esService, storeConfig, mqConfig).new Fetcher(
+        List<InsertData> insertServiceList = this.getInsertServiceList();
+        Thread consumeFetch = new Thread(new TubeConsume(insertServiceList, storeConfig, mqConfig).new Fetcher(
                 pullMessageConsumer, topic), "fetch thread");
         consumeFetch.start();
         consumeFetch.interrupt();
     }
+
+    /**
+     * getInsertServiceList
+     * @return
+     */
+    private List<InsertData> getInsertServiceList() {
+        List<InsertData> insertServiceList = new ArrayList<>();
+        insertServiceList.add(new MySqlService(auditDataDao));
+        insertServiceList.add(esService);
+        insertServiceList.add(new ClickHouseService(chConfig));
+        return insertServiceList;
+    }
 }
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index c0b0374b1..5801cdd2a 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -77,3 +77,12 @@ elasticsearch.bulkInterval=10
 elasticsearch.bulkThreshold=5000
 elasticsearch.auditIdSet=1,2
 
+# clickhouse config
+clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
+clickhouse.url=jdbc:clickhouse://127.0.0.1:8123/default
+clickhouse.username=default
+clickhouse.password=default
+clickhouse.batchIntervalMs=1000
+clickhouse.batchThreshold=500
+clickhouse.processIntervalMs=100
+
diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
new file mode 100644
index 000000000..f9dd34d80
--- /dev/null
+++ b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
+SET time_zone = "+00:00";
+
+CREATE DATABASE IF NOT EXISTS apache_inlong_audit;
+USE apache_inlong_audit;
+
+CREATE TABLE `audit_data`
+(
+    `ip`               String COMMENT 'client ip',
+    `docker_id`        String COMMENT 'client docker id',
+    `thread_id`        String COMMENT 'client thread id',
+    `sdk_ts`           DateTime COMMENT 'sdk timestamp',
+    `packet_id`        Int64 COMMENT 'packet id',
+    `log_ts`           DateTime COMMENT 'log timestamp',
+    `inlong_group_id`  String COMMENT 'inlong group id',
+    `inlong_stream_id` String COMMENT 'inlong stream id',
+    `audit_id`         String COMMENT 'audit id',
+    `count`            Int64 COMMENT 'msg count',
+    `size`             Int64 COMMENT 'msg size',
+    `delay`            Int64 COMMENT 'msg delay',
+    `update_time`       DateTime COMMENT 'update time'
+)
+ENGINE = MergeTree
+ORDER BY inlong_group_id
+SETTINGS index_granularity = 8192;
diff --git a/licenses/inlong-audit/LICENSE b/licenses/inlong-audit/LICENSE
index 01a309ad5..13aa10a47 100644
--- a/licenses/inlong-audit/LICENSE
+++ b/licenses/inlong-audit/LICENSE
@@ -642,6 +642,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
   org.springframework:spring-jcl:5.3.20 - Spring Commons Logging Bridge (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
   org.springframework:spring-jdbc:5.3.20 - Spring JDBC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
   org.springframework:spring-tx:5.3.20 - Spring Transaction (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
+  ru.yandex.clickhouse:clickhouse-jdbc:0.3.1 - clickhouse-jdbc (https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc), (The Apache Software License, Version 2.0)
 
 
 ========================================================================
diff --git a/licenses/inlong-sort-standalone/LICENSE b/licenses/inlong-sort-standalone/LICENSE
index aced6c438..38b60db9d 100644
--- a/licenses/inlong-sort-standalone/LICENSE
+++ b/licenses/inlong-sort-standalone/LICENSE
@@ -647,6 +647,7 @@ The text of each license is the standard Apache 2.0 license.
   xml-apis:xml-apis:1.4.01 - XML Commons External Components XML APIs (http://xml.apache.org/commons/components/external/), (The Apache Software License, Version 2.0), (Apache 2.0, The SAX License, The W3C License)
   org.apache.zookeeper:zookeeper:3.6.3 - Apache ZooKeeper - Server (https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-server), (Apache License, Version 2.0)
   org.apache.zookeeper:zookeeper-jute:3.6.3 - Apache ZooKeeper - Jute (https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-jute), (Apache License, Version 2.0)
+  ru.yandex.clickhouse:clickhouse-jdbc:0.3.1 - clickhouse-jdbc (https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc), (The Apache Software License, Version 2.0)
 
   
 ========================================================================