You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/30 02:27:15 UTC
[inlong] branch master updated: [INLONG-4741][Audit] AuditStore support ClickHouse sink (#4747)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 893a8fc9e [INLONG-4741][Audit] AuditStore support ClickHouse sink (#4747)
893a8fc9e is described below
commit 893a8fc9e5c1824cfa1b4b81c0e098b3ebb96cef
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Jun 30 10:27:11 2022 +0800
[INLONG-4741][Audit] AuditStore support ClickHouse sink (#4747)
---
inlong-audit/audit-store/pom.xml | 4 +
.../{StoreConfig.java => ClickHouseConfig.java} | 34 ++--
.../apache/inlong/audit/config/StoreConfig.java | 4 +
.../entities/ClickHouseDataPo.java} | 33 ++--
.../audit/service/AuditMsgConsumerServer.java | 35 +++-
.../inlong/audit/service/ClickHouseService.java | 202 +++++++++++++++++++++
.../inlong/audit/service/ElasticsearchService.java | 31 +++-
.../StoreConfig.java => service/InsertData.java} | 38 ++--
.../apache/inlong/audit/service/MySqlService.java | 64 +++++++
.../inlong/audit/service/consume/BaseConsume.java | 64 +++----
.../audit/service/consume/PulsarConsume.java | 22 ++-
.../inlong/audit/service/consume/TubeConsume.java | 14 +-
.../sql/apache_inlong_audit_clickhouse.sql | 44 +++++
.../audit/service/consume/TubeConsumeTest.java | 27 ++-
inlong-audit/conf/application.properties | 9 +
.../sql/apache_inlong_audit_clickhouse.sql | 44 +++++
licenses/inlong-audit/LICENSE | 1 +
licenses/inlong-sort-standalone/LICENSE | 1 +
18 files changed, 553 insertions(+), 118 deletions(-)
diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 0193c2371..101b6b988 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -156,6 +156,10 @@
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
+ <dependency>
+ <groupId>ru.yandex.clickhouse</groupId>
+ <artifactId>clickhouse-jdbc</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
similarity index 61%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
index 71dbc27c8..df26241e6 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/ClickHouseConfig.java
@@ -17,25 +17,35 @@
package org.apache.inlong.audit.config;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
import lombok.Getter;
import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-@Component
+@Configuration
@Getter
@Setter
-public class StoreConfig {
+public class ClickHouseConfig {
+
+ @Value("${clickhouse.driver}")
+ private String driver;
+
+ @Value("${clickhouse.url}")
+ private String url;
- @Value("${audit.config.store.mode:mysql}")
- private String store;
+ @Value("${clickhouse.username}")
+ private String username;
- public boolean isMysqlStore() {
- return store.contains("mysql");
- }
+ @Value("${clickhouse.password}")
+ private String password;
- public boolean isElasticsearchStore() {
- return store.contains("elasticsearch");
- }
+ @Value("${clickhouse.batchIntervalMs:1000}")
+ private int batchIntervalMs;
+
+ @Value("${clickhouse.batchThreshold:500}")
+ private int batchThreshold;
+ @Value("${clickhouse.processIntervalMs:100}")
+ private int processIntervalMs;
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
index 71dbc27c8..41ab00da8 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
@@ -38,4 +38,8 @@ public class StoreConfig {
return store.contains("elasticsearch");
}
+ public boolean isClickHouseStore() {
+ return store.contains("clickhouse");
+ }
+
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
similarity index 64%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
index 71dbc27c8..50b0b676d 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
@@ -15,27 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.db.entities;
+
+import java.sql.Timestamp;
import lombok.Getter;
import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-@Component
@Getter
@Setter
-public class StoreConfig {
-
- @Value("${audit.config.store.mode:mysql}")
- private String store;
-
- public boolean isMysqlStore() {
- return store.contains("mysql");
- }
-
- public boolean isElasticsearchStore() {
- return store.contains("elasticsearch");
- }
+public class ClickHouseDataPo {
+ private String ip;
+ private String dockerId;
+ private String threadId;
+ private Timestamp sdkTs;
+ private long packetId;
+ private Timestamp logTs;
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String auditId;
+ private long count;
+ private long size;
+ private long delay;
+ private Timestamp updateTime;
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 319807851..aa0acbf2d 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -17,6 +17,7 @@
package org.apache.inlong.audit.service;
+import org.apache.inlong.audit.config.ClickHouseConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.db.dao.AuditDataDao;
@@ -29,6 +30,9 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
+import java.util.List;
+
@Service
public class AuditMsgConsumerServer implements InitializingBean {
@@ -41,16 +45,19 @@ public class AuditMsgConsumerServer implements InitializingBean {
private ElasticsearchService esService;
@Autowired
private StoreConfig storeConfig;
+ @Autowired
+ private ClickHouseConfig chConfig;
/**
* Initializing bean
*/
public void afterPropertiesSet() {
- BaseConsume mqConsume;
+ BaseConsume mqConsume = null;
+ List<InsertData> insertServiceList = this.getInsertServiceList();
if (mqConfig.isPulsar()) {
- mqConsume = new PulsarConsume(auditDataDao, esService, storeConfig, mqConfig);
+ mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
} else if (mqConfig.isTube()) {
- mqConsume = new TubeConsume(auditDataDao, esService, storeConfig, mqConfig);
+ mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
} else {
LOG.error("unkown MessageQueue {}", mqConfig.getMqType());
return;
@@ -59,12 +66,24 @@ public class AuditMsgConsumerServer implements InitializingBean {
if (storeConfig.isElasticsearchStore()) {
esService.startTimerRoutine();
}
+ mqConsume.start();
+ }
- if (mqConsume != null) {
- mqConsume.start();
- } else {
- LOG.error("fail to auditMsgConsumerServer");
+ /**
+ * getInsertServiceList
+ * @return
+ */
+ private List<InsertData> getInsertServiceList() {
+ List<InsertData> insertServiceList = new ArrayList<>();
+ if (storeConfig.isMysqlStore()) {
+ insertServiceList.add(new MySqlService(auditDataDao));
+ }
+ if (storeConfig.isElasticsearchStore()) {
+ insertServiceList.add(esService);
}
+ if (storeConfig.isClickHouseStore()) {
+ insertServiceList.add(new ClickHouseService(chConfig));
+ }
+ return insertServiceList;
}
-
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
new file mode 100644
index 000000000..24042d5aa
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.config.ClickHouseConfig;
+import org.apache.inlong.audit.db.entities.ClickHouseDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * ClickHouseService
+ */
+public class ClickHouseService implements InsertData, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClickHouseService.class);
+ public static final String INSERT_SQL = "insert into audit_data (ip, docker_id, thread_id,\r\n"
+ + " sdk_ts, packet_id, log_ts,\r\n"
+ + " inlong_group_id, inlong_stream_id, audit_id,\r\n"
+ + " count, size, delay, \r\n"
+ + " update_time)\r\n"
+ + " values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ private ClickHouseConfig chConfig;
+
+ private ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+ private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
+ private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
+ private AtomicInteger batchCounter = new AtomicInteger(0);
+
+ private Connection conn;
+
+ /**
+ * Constructor
+ * @param chConfig ClickHouse service config, such as jdbc url, jdbc username, jdbc password.
+ */
+ public ClickHouseService(ClickHouseConfig chConfig) {
+ this.chConfig = chConfig;
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ // queue
+ this.batchQueue = new LinkedBlockingQueue<>(
+ chConfig.getBatchThreshold() * chConfig.getBatchIntervalMs() / chConfig.getProcessIntervalMs());
+ // connection
+ try {
+ Class.forName(chConfig.getDriver());
+ this.reconnect();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // timer
+ long currentTime = System.currentTimeMillis();
+ // batch output interval
+ timerService.scheduleWithFixedDelay(() -> needBatchOutput.compareAndSet(false, true),
+ currentTime + chConfig.getBatchIntervalMs(),
+ chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
+ // batch output process
+ timerService.scheduleWithFixedDelay(() -> processOutput(),
+ currentTime + chConfig.getProcessIntervalMs(),
+ chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * processOutput
+ */
+ private void processOutput() {
+ if (!this.needBatchOutput.get()) {
+ return;
+ }
+ // output
+ try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) {
+ // insert data
+ ClickHouseDataPo data = this.batchQueue.poll();
+ int counter = 0;
+ while (data != null) {
+ pstat.setString(1, data.getIp());
+ pstat.setString(2, data.getDockerId());
+ pstat.setString(3, data.getThreadId());
+ pstat.setTimestamp(4, data.getSdkTs());
+ pstat.setLong(5, data.getPacketId());
+ pstat.setTimestamp(6, data.getLogTs());
+ pstat.setString(7, data.getInlongGroupId());
+ pstat.setString(8, data.getInlongStreamId());
+ pstat.setString(9, data.getAuditId());
+ pstat.setLong(10, data.getCount());
+ pstat.setLong(11, data.getSize());
+ pstat.setLong(12, data.getDelay());
+ pstat.setTimestamp(13, data.getUpdateTime());
+ pstat.addBatch();
+ this.batchCounter.decrementAndGet();
+ if (++counter >= chConfig.getBatchThreshold()) {
+ pstat.executeBatch();
+ this.conn.commit();
+ counter = 0;
+ }
+ }
+ this.batchCounter.set(0);
+ pstat.executeBatch();
+ this.conn.commit();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ try {
+ this.reconnect();
+ } catch (SQLException e1) {
+ LOG.error(e1.getMessage(), e1);
+ }
+ }
+
+ // recover
+ this.needBatchOutput.compareAndSet(true, false);
+ }
+
+ /**
+ * reconnect
+ * @throws SQLException Exception when creating connection.
+ */
+ private void reconnect() throws SQLException {
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ this.conn = null;
+ }
+ this.conn = DriverManager.getConnection(chConfig.getUrl(), chConfig.getUsername(),
+ chConfig.getPassword());
+ this.conn.setAutoCommit(false);
+ }
+
+ /**
+ * insert
+ * @param msgBody audit data reading from Pulsar or other MessageQueue.
+ */
+ @Override
+ public void insert(AuditData msgBody) {
+ ClickHouseDataPo data = new ClickHouseDataPo();
+ data.setIp(msgBody.getIp());
+ data.setThreadId(msgBody.getThreadId());
+ data.setDockerId(msgBody.getDockerId());
+ data.setPacketId(msgBody.getPacketId());
+ data.setSdkTs(new Timestamp(msgBody.getSdkTs()));
+ data.setLogTs(new Timestamp(msgBody.getLogTs()));
+ data.setAuditId(msgBody.getAuditId());
+ data.setCount(msgBody.getCount());
+ data.setDelay(msgBody.getDelay());
+ data.setInlongGroupId(msgBody.getInlongGroupId());
+ data.setInlongStreamId(msgBody.getInlongStreamId());
+ data.setSize(msgBody.getSize());
+ data.setUpdateTime(new Timestamp(System.currentTimeMillis()));
+ try {
+ this.batchQueue.offer(data, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ if (this.batchCounter.incrementAndGet() >= chConfig.getBatchThreshold()) {
+ this.needBatchOutput.compareAndSet(false, true);
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ * @throws Exception Exception when closing ClickHouse connection.
+ */
+ @Override
+ public void close() throws Exception {
+ this.conn.close();
+ this.timerService.shutdown();
+ }
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
index ca573564b..b381a4623 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java
@@ -20,8 +20,10 @@ package org.apache.inlong.audit.service;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
import org.apache.inlong.audit.config.ElasticsearchConfig;
import org.apache.inlong.audit.db.entities.ESDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -55,7 +57,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Service
-public class ElasticsearchService implements AutoCloseable {
+public class ElasticsearchService implements InsertData, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchService.class);
@@ -71,6 +73,7 @@ public class ElasticsearchService implements AutoCloseable {
public void startTimerRoutine() {
timerService.scheduleAtFixedRate((new Runnable() {
+
@Override
public void run() {
try {
@@ -82,6 +85,7 @@ public class ElasticsearchService implements AutoCloseable {
}), 1, 1, TimeUnit.DAYS);
timerService.scheduleWithFixedDelay((new Runnable() {
+
@Override
public void run() {
try {
@@ -312,7 +316,26 @@ public class ElasticsearchService implements AutoCloseable {
builder.endObject();
return builder;
}
-}
-
-
+ /**
+ * insert
+ * @param msgBody
+ */
+ @Override
+ public void insert(AuditData msgBody) {
+ ESDataPo esPo = new ESDataPo();
+ esPo.setIp(msgBody.getIp());
+ esPo.setThreadId(msgBody.getThreadId());
+ esPo.setDockerId(msgBody.getDockerId());
+ esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
+ esPo.setLogTs(new Date(msgBody.getLogTs()));
+ esPo.setAuditId(msgBody.getAuditId());
+ esPo.setCount(msgBody.getCount());
+ esPo.setDelay(msgBody.getDelay());
+ esPo.setInlongGroupId(msgBody.getInlongGroupId());
+ esPo.setInlongStreamId(msgBody.getInlongStreamId());
+ esPo.setSize(msgBody.getSize());
+ esPo.setPacketId(msgBody.getPacketId());
+ this.insertData(esPo);
+ }
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
similarity index 51%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
index 71dbc27c8..61cd32826 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java
@@ -1,10 +1,10 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -15,27 +15,19 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.config;
+package org.apache.inlong.audit.service;
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
+import org.apache.inlong.audit.protocol.AuditData;
-@Component
-@Getter
-@Setter
-public class StoreConfig {
-
- @Value("${audit.config.store.mode:mysql}")
- private String store;
-
- public boolean isMysqlStore() {
- return store.contains("mysql");
- }
-
- public boolean isElasticsearchStore() {
- return store.contains("elasticsearch");
- }
+/**
+ * InsertData
+ *
+ */
+public interface InsertData {
+ /**
+ * insert
+ * @param msgBody
+ */
+ void insert(AuditData msgBody);
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
new file mode 100644
index 000000000..e576b789c
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/MySqlService.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service;
+
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.db.entities.AuditDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+
+import java.util.Date;
+
+/**
+ * MySqlService
+ *
+ */
+public class MySqlService implements InsertData {
+
+ private AuditDataDao dao;
+
+ /**
+ * Constructor
+ * @param dao
+ */
+ public MySqlService(AuditDataDao dao) {
+ this.dao = dao;
+ }
+
+ /**
+ * insert
+ * @param msgBody
+ */
+ @Override
+ public void insert(AuditData msgBody) {
+ AuditDataPo po = new AuditDataPo();
+ po.setIp(msgBody.getIp());
+ po.setThreadId(msgBody.getThreadId());
+ po.setDockerId(msgBody.getDockerId());
+ po.setPacketId(msgBody.getPacketId());
+ po.setSdkTs(new Date(msgBody.getSdkTs()));
+ po.setLogTs(new Date(msgBody.getLogTs()));
+ po.setAuditId(msgBody.getAuditId());
+ po.setCount(msgBody.getCount());
+ po.setDelay(msgBody.getDelay());
+ po.setInlongGroupId(msgBody.getInlongGroupId());
+ po.setInlongStreamId(msgBody.getInlongStreamId());
+ po.setSize(msgBody.getSize());
+ dao.insert(po);
+ }
+
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
index f73226b0a..845c55d02 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
@@ -18,69 +18,49 @@
package org.apache.inlong.audit.service.consume;
import com.google.gson.Gson;
+
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-import org.apache.inlong.audit.db.entities.ESDataPo;
import org.apache.inlong.audit.protocol.AuditData;
-import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Date;
+import java.util.List;
public abstract class BaseConsume {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseConsume.class);
+
private final Gson gson = new Gson();
- protected AuditDataDao auditDataDao;
- protected ElasticsearchService esService;
+ protected List<InsertData> insertServiceList;
protected StoreConfig storeConfig;
protected MessageQueueConfig mqConfig;
- public BaseConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+ public BaseConsume(List<InsertData> insertServiceList, StoreConfig storeConfig,
MessageQueueConfig mqConfig) {
- this.auditDataDao = auditDataDao;
- this.esService = esService;
+ this.insertServiceList = insertServiceList;
this.storeConfig = storeConfig;
this.mqConfig = mqConfig;
}
public abstract void start();
+ /**
+ * handleMessage
+ * @param body
+ * @throws Exception
+ */
protected void handleMessage(String body) throws Exception {
AuditData msgBody = gson.fromJson(body, AuditData.class);
- if (storeConfig.isMysqlStore()) {
- AuditDataPo po = new AuditDataPo();
- po.setIp(msgBody.getIp());
- po.setThreadId(msgBody.getThreadId());
- po.setDockerId(msgBody.getDockerId());
- po.setPacketId(msgBody.getPacketId());
- po.setSdkTs(new Date(msgBody.getSdkTs()));
- po.setLogTs(new Date(msgBody.getLogTs()));
- po.setAuditId(msgBody.getAuditId());
- po.setCount(msgBody.getCount());
- po.setDelay(msgBody.getDelay());
- po.setInlongGroupId(msgBody.getInlongGroupId());
- po.setInlongStreamId(msgBody.getInlongStreamId());
- po.setSize(msgBody.getSize());
- auditDataDao.insert(po);
- }
- if (storeConfig.isElasticsearchStore()) {
- ESDataPo esPo = new ESDataPo();
- esPo.setIp(msgBody.getIp());
- esPo.setThreadId(msgBody.getThreadId());
- esPo.setDockerId(msgBody.getDockerId());
- esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
- esPo.setLogTs(new Date(msgBody.getLogTs()));
- esPo.setAuditId(msgBody.getAuditId());
- esPo.setCount(msgBody.getCount());
- esPo.setDelay(msgBody.getDelay());
- esPo.setInlongGroupId(msgBody.getInlongGroupId());
- esPo.setInlongStreamId(msgBody.getInlongStreamId());
- esPo.setSize(msgBody.getSize());
- esPo.setPacketId(msgBody.getPacketId());
- esService.insertData(esPo);
- }
+ this.insertServiceList.forEach((service) -> {
+ try {
+ service.insert(msgBody);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ });
}
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
index 089e3a099..d93726380 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
@@ -18,11 +18,11 @@
package org.apache.inlong.audit.service.consume;
import com.google.common.base.Preconditions;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
@@ -45,9 +45,15 @@ public class PulsarConsume extends BaseConsume {
private static final Logger LOG = LoggerFactory.getLogger(PulsarConsume.class);
private final ConcurrentHashMap<String, List<Consumer<byte[]>>> topicConsumerMap = new ConcurrentHashMap<>();
- public PulsarConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+ /**
+ * Constructor
+ * @param insertServiceList
+ * @param storeConfig
+ * @param mqConfig
+ */
+ public PulsarConsume(List<InsertData> insertServiceList, StoreConfig storeConfig,
MessageQueueConfig mqConfig) {
- super(auditDataDao, esService, storeConfig, mqConfig);
+ super(insertServiceList, storeConfig, mqConfig);
}
@Override
@@ -79,9 +85,8 @@ public class PulsarConsume extends BaseConsume {
}
protected void updateConcurrentConsumer(PulsarClient pulsarClient) {
- List<Consumer<byte[]>> list =
- topicConsumerMap.computeIfAbsent(mqConfig.getPulsarTopic(),
- key -> new ArrayList<Consumer<byte[]>>());
+ List<Consumer<byte[]>> list = topicConsumerMap.computeIfAbsent(mqConfig.getPulsarTopic(),
+ key -> new ArrayList<Consumer<byte[]>>());
int currentConsumerNum = list.size();
int createNum = mqConfig.getConcurrentConsumerNum() - currentConsumerNum;
/*
@@ -120,6 +125,7 @@ public class PulsarConsume extends BaseConsume {
.receiverQueueSize(mqConfig.getConsumerReceiveQueueSize())
.enableRetry(mqConfig.isPulsarConsumerEnableRetry())
.messageListener(new MessageListener<byte[]>() {
+
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
try {
String body = new String(msg.getData(), StandardCharsets.UTF_8);
@@ -135,7 +141,7 @@ public class PulsarConsume extends BaseConsume {
consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
} catch (PulsarClientException pulsarClientException) {
LOG.error("Consumer reconsumeLater has exception "
- + "topic {}, subName {}, ex {}",
+ + "topic {}, subName {}, ex {}",
topic,
mqConfig.getPulsarConsumerSubName(),
pulsarClientException);
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
index 37d976950..a70979fa6 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
@@ -18,12 +18,12 @@
package org.apache.inlong.audit.service.consume;
import com.google.common.base.Preconditions;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
-import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
@@ -46,9 +46,15 @@ public class TubeConsume extends BaseConsume {
private String topic;
private int fetchThreadCnt = 4;
- public TubeConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+ /**
+ * Constructor
+ * @param insertServiceList
+ * @param storeConfig
+ * @param mqConfig
+ */
+ public TubeConsume(List<InsertData> insertServiceList, StoreConfig storeConfig,
MessageQueueConfig mqConfig) {
- super(auditDataDao, esService, storeConfig, mqConfig);
+ super(insertServiceList, storeConfig, mqConfig);
}
@Override
diff --git a/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
new file mode 100644
index 000000000..f9dd34d80
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
+SET time_zone = "+00:00";
+
+CREATE DATABASE IF NOT EXISTS apache_inlong_audit;
+USE apache_inlong_audit;
+
+CREATE TABLE `audit_data`
+(
+ `ip` String COMMENT 'client ip',
+ `docker_id` String COMMENT 'client docker id',
+ `thread_id` String COMMENT 'client thread id',
+ `sdk_ts` DateTime COMMENT 'sdk timestamp',
+ `packet_id` Int64 COMMENT 'packet id',
+ `log_ts` DateTime COMMENT 'log timestamp',
+ `inlong_group_id` String COMMENT 'inlong group id',
+ `inlong_stream_id` String COMMENT 'inlong stream id',
+ `audit_id` String COMMENT 'audit id',
+ `count` Int64 COMMENT 'msg count',
+ `size` Int64 COMMENT 'msg size',
+ `delay` Int64 COMMENT 'msg delay',
+ `update_time` DateTime COMMENT 'update time'
+)
+ENGINE = MergeTree
+ORDER BY inlong_group_id
+SETTINGS index_granularity = 8192;
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
index 22784cca4..130156aae 100644
--- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
@@ -17,16 +17,23 @@
package org.apache.inlong.audit.service.consume;
+import org.apache.inlong.audit.config.ClickHouseConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ClickHouseService;
import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.audit.service.InsertData;
+import org.apache.inlong.audit.service.MySqlService;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -35,6 +42,7 @@ public class TubeConsumeTest {
private PullMessageConsumer pullMessageConsumer;
private AuditDataDao auditDataDao;
private ElasticsearchService esService;
+ private ClickHouseConfig chConfig;
private StoreConfig storeConfig;
private MessageQueueConfig mqConfig;
private String topic = "inlong-audit";
@@ -52,11 +60,28 @@ public class TubeConsumeTest {
when(consumerResult.isSuccess()).thenReturn(false);
}
+ /**
+ * testConsume
+ * @throws InterruptedException
+ */
@Test
public void testConsume() throws InterruptedException {
- Thread consumeFetch = new Thread(new TubeConsume(auditDataDao, esService, storeConfig, mqConfig).new Fetcher(
+ List<InsertData> insertServiceList = this.getInsertServiceList();
+ Thread consumeFetch = new Thread(new TubeConsume(insertServiceList, storeConfig, mqConfig).new Fetcher(
pullMessageConsumer, topic), "fetch thread");
consumeFetch.start();
consumeFetch.interrupt();
}
+
+ /**
+ * getInsertServiceList
+ * @return
+ */
+ private List<InsertData> getInsertServiceList() {
+ List<InsertData> insertServiceList = new ArrayList<>();
+ insertServiceList.add(new MySqlService(auditDataDao));
+ insertServiceList.add(esService);
+ insertServiceList.add(new ClickHouseService(chConfig));
+ return insertServiceList;
+ }
}
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index c0b0374b1..5801cdd2a 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -77,3 +77,12 @@ elasticsearch.bulkInterval=10
elasticsearch.bulkThreshold=5000
elasticsearch.auditIdSet=1,2
+# clickhouse config
+clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
+clickhouse.url=jdbc:clickhouse://127.0.0.1:8123/default
+clickhouse.username=default
+clickhouse.password=default
+clickhouse.batchIntervalMs=1000
+clickhouse.batchThreshold=500
+clickhouse.processIntervalMs=100
+
diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
new file mode 100644
index 000000000..f9dd34d80
--- /dev/null
+++ b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
+SET time_zone = "+00:00";
+
+CREATE DATABASE IF NOT EXISTS apache_inlong_audit;
+USE apache_inlong_audit;
+
+CREATE TABLE `audit_data`
+(
+ `ip` String COMMENT 'client ip',
+ `docker_id` String COMMENT 'client docker id',
+ `thread_id` String COMMENT 'client thread id',
+ `sdk_ts` DateTime COMMENT 'sdk timestamp',
+ `packet_id` Int64 COMMENT 'packet id',
+ `log_ts` DateTime COMMENT 'log timestamp',
+ `inlong_group_id` String COMMENT 'inlong group id',
+ `inlong_stream_id` String COMMENT 'inlong stream id',
+ `audit_id` String COMMENT 'audit id',
+ `count` Int64 COMMENT 'msg count',
+ `size` Int64 COMMENT 'msg size',
+ `delay` Int64 COMMENT 'msg delay',
+ `update_time` DateTime COMMENT 'update time'
+)
+ENGINE = MergeTree
+ORDER BY inlong_group_id
+SETTINGS index_granularity = 8192;
diff --git a/licenses/inlong-audit/LICENSE b/licenses/inlong-audit/LICENSE
index 01a309ad5..13aa10a47 100644
--- a/licenses/inlong-audit/LICENSE
+++ b/licenses/inlong-audit/LICENSE
@@ -642,6 +642,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
org.springframework:spring-jcl:5.3.20 - Spring Commons Logging Bridge (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-jdbc:5.3.20 - Spring JDBC (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
org.springframework:spring-tx:5.3.20 - Spring Transaction (https://github.com/spring-projects/spring-framework), (Apache License, Version 2.0)
+ ru.yandex.clickhouse:clickhouse-jdbc:0.3.1 - clickhouse-jdbc (https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc), (The Apache Software License, Version 2.0)
========================================================================
diff --git a/licenses/inlong-sort-standalone/LICENSE b/licenses/inlong-sort-standalone/LICENSE
index aced6c438..38b60db9d 100644
--- a/licenses/inlong-sort-standalone/LICENSE
+++ b/licenses/inlong-sort-standalone/LICENSE
@@ -647,6 +647,7 @@ The text of each license is the standard Apache 2.0 license.
xml-apis:xml-apis:1.4.01 - XML Commons External Components XML APIs (http://xml.apache.org/commons/components/external/), (The Apache Software License, Version 2.0), (Apache 2.0, The SAX License, The W3C License)
org.apache.zookeeper:zookeeper:3.6.3 - Apache ZooKeeper - Server (https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-server), (Apache License, Version 2.0)
org.apache.zookeeper:zookeeper-jute:3.6.3 - Apache ZooKeeper - Jute (https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-jute), (Apache License, Version 2.0)
+ ru.yandex.clickhouse:clickhouse-jdbc:0.3.1 - clickhouse-jdbc (https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-jdbc), (The Apache Software License, Version 2.0)
========================================================================