You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/21 07:30:00 UTC
[incubator-inlong] branch master updated: [INLONG-3159][Audit] Store support TubeMQ (#3227)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 37391ce [INLONG-3159][Audit] Store support TubeMQ (#3227)
37391ce is described below
commit 37391ce5fe28548b8d53d56ec0b4095f1dd4ece8
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Mar 21 15:29:56 2022 +0800
[INLONG-3159][Audit] Store support TubeMQ (#3227)
---
inlong-audit/audit-store/pom.xml | 42 ++++--
.../{PulsarConfig.java => MessageQueueConfig.java} | 32 +++-
.../audit/service/AuditMsgConsumerServer.java | 168 +++------------------
.../inlong/audit/service/consume/BaseConsume.java | 86 +++++++++++
.../PulsarConsume.java} | 113 +++++---------
.../inlong/audit/service/consume/TubeConsume.java | 142 +++++++++++++++++
.../PulsarConsumeTest.java} | 12 +-
.../audit/service/consume/TubeConsumeTest.java | 62 ++++++++
.../src/test/resources/application-test.properties | 12 +-
inlong-audit/conf/application.properties | 19 ++-
10 files changed, 430 insertions(+), 258 deletions(-)
diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index de901db..57ea3c2 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -17,9 +17,9 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-audit</artifactId>
@@ -163,6 +163,17 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<resources>
@@ -240,7 +251,8 @@
<archive>
<manifest>
<mainClass>${start-class}</mainClass>
- <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+ <addDefaultImplementationEntries>true
+ </addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
@@ -252,7 +264,8 @@
<archive>
<manifest>
<mainClass>${start-class}</mainClass>
- <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+ <addDefaultImplementationEntries>true
+ </addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
@@ -281,7 +294,9 @@
<verbose>true</verbose>
<dateFormat>yyyy-MM-dd'T'HH:mm:ssZ</dateFormat>
<generateGitPropertiesFile>true</generateGitPropertiesFile>
- <generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename>
+ <generateGitPropertiesFilename>
+ ${project.build.outputDirectory}/git.properties
+ </generateGitPropertiesFilename>
</configuration>
</plugin>
<plugin>
@@ -332,17 +347,22 @@
</goals>
<configuration>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
- <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
+ <transformer
+ implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
</transformers>
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/PulsarConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
similarity index 68%
rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/PulsarConfig.java
rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
index 31bf285..64c80cd 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/PulsarConfig.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java
@@ -25,14 +25,15 @@ import org.springframework.stereotype.Component;
@Component
@Getter
@Setter
-public class PulsarConfig {
- @Value("${audit.pulsar.server.url}")
+public class MessageQueueConfig {
+
+ @Value("${audit.pulsar.server.url:}")
private String pulsarServerUrl;
- @Value("${audit.pulsar.topic}")
+ @Value("${audit.pulsar.topic:}")
private String pulsarTopic;
- @Value("${audit.pulsar.consumer.sub.name}")
+ @Value("${audit.pulsar.consumer.sub.name:}")
private String pulsarConsumerSubName;
@Value("${audit.pulsar.consumer.enable.retry:false}")
@@ -47,4 +48,27 @@ public class PulsarConfig {
@Value("${audit.pulsar.client.concurrent.consumer.num:1}")
private int concurrentConsumerNum = 1;
+ @Value("${audit.tube.masterlist:}")
+ private String tubeMasterList;
+
+ @Value("${audit.tube.topic:}")
+ private String tubeTopic;
+
+ @Value("${audit.tube.consumer.group.name:}")
+ private String tubeConsumerGroupName;
+
+ @Value("${audit.tube.consumer.thread.num:4}")
+ private int tubeThreadNum;
+
+ @Value("${audit.config.proxy.type:pulsar}")
+ private String mqType;
+
+ public boolean isPulsar() {
+ return mqType.trim().equalsIgnoreCase("pulsar");
+ }
+
+ public boolean isTube() {
+ return mqType.trim().equalsIgnoreCase("tube");
+ }
+
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index a98054b..bb3098c 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -17,42 +17,24 @@
package org.apache.inlong.audit.service;
-import com.google.gson.Gson;
-import org.apache.commons.lang.StringUtils;
-import org.apache.inlong.audit.config.PulsarConfig;
+import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-import org.apache.inlong.audit.db.entities.ESDataPo;
-import org.apache.inlong.audit.protocol.AuditData;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.inlong.audit.service.consume.BaseConsume;
+import org.apache.inlong.audit.service.consume.PulsarConsume;
+import org.apache.inlong.audit.service.consume.TubeConsume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
@Service
public class AuditMsgConsumerServer implements InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(AuditMsgConsumerServer.class);
- private final Gson gson = new Gson();
- private final ConcurrentHashMap<String, List<Consumer<byte[]>>> topicConsumerMap = new ConcurrentHashMap<>();
-
@Autowired
- private PulsarConfig pulsarConfig;
+ private MessageQueueConfig mqConfig;
@Autowired
private AuditDataDao auditDataDao;
@Autowired
@@ -61,136 +43,24 @@ public class AuditMsgConsumerServer implements InitializingBean {
private StoreConfig storeConfig;
public void afterPropertiesSet() {
- PulsarClient pulsarClient = getOrCreatePulsarClient(pulsarConfig.getPulsarServerUrl());
- if (storeConfig.isElasticsearchStore()) {
- esService.startTimerRoutine();
- }
- updateConcurrentConsumer(pulsarClient);
- }
-
- private PulsarClient getOrCreatePulsarClient(String pulsarServerUrl) {
- LOG.info("start consumer pulsarServerUrl = {}", pulsarServerUrl);
- PulsarClient pulsarClient = null;
- try {
- pulsarClient = PulsarClient.builder().serviceUrl(pulsarServerUrl)
- .connectionTimeout(pulsarConfig.getClientOperationTimeoutSecond(),
- TimeUnit.SECONDS).build();
- } catch (PulsarClientException e) {
- LOG.error("getOrCreatePulsarClient has pulsar {} err {}", pulsarServerUrl, e);
- }
- return pulsarClient;
- }
-
- protected void updateConcurrentConsumer(PulsarClient pulsarClient) {
- List<Consumer<byte[]>> list =
- topicConsumerMap.computeIfAbsent(pulsarConfig.getPulsarTopic(),
- key -> new ArrayList<Consumer<byte[]>>());
- int currentConsumerNum = list.size();
- int createNum = pulsarConfig.getConcurrentConsumerNum() - currentConsumerNum;
- /*
- * add consumer
- */
- if (createNum > 0) {
- for (int i = 0; i < pulsarConfig.getConcurrentConsumerNum(); i++) {
- Consumer<byte[]> consumer = createConsumer(pulsarClient, pulsarConfig.getPulsarTopic());
- if (consumer != null) {
- list.add(consumer);
- }
- }
- } else if (createNum < 0) {
- /*
- * delete consumer
- */
- int removeIndex = currentConsumerNum - 1;
- for (int i = createNum; i < 0; i++) {
- Consumer<byte[]> consumer = list.remove(removeIndex);
- consumer.closeAsync();
- removeIndex -= 1;
- }
+ BaseConsume mqConsume;
+ if (mqConfig.isPulsar()) {
+ mqConsume = new PulsarConsume(auditDataDao, esService, storeConfig, mqConfig);
+ } else if (mqConfig.isTube()) {
+ mqConsume = new TubeConsume(auditDataDao, esService, storeConfig, mqConfig);
+ } else {
+ LOG.error("unkown MessageQueue {}", mqConfig.getMqType());
+ return;
}
- }
- protected Consumer<byte[]> createConsumer(PulsarClient pulsarClient, String topic) {
- Consumer<byte[]> consumer = null;
- if (pulsarClient != null && StringUtils.isNotEmpty(topic)) {
- LOG.info("createConsumer has topic {}, subName {}", topic,
- pulsarConfig.getPulsarConsumerSubName());
- try {
- consumer = pulsarClient.newConsumer()
- .subscriptionName(pulsarConfig.getPulsarConsumerSubName())
- .subscriptionType(SubscriptionType.Shared)
- .topic(topic)
- .receiverQueueSize(pulsarConfig.getConsumerReceiveQueueSize())
- .enableRetry(pulsarConfig.isPulsarConsumerEnableRetry())
- .messageListener(new MessageListener<byte[]>() {
- public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
- try {
- handleMessage(msg);
- consumer.acknowledge(msg);
- } catch (Exception e) {
- LOG.error("Consumer has exception topic {}, subName {}, ex {}",
- topic,
- pulsarConfig.getPulsarConsumerSubName(),
- e);
- if (pulsarConfig.isPulsarConsumerEnableRetry()) {
- try {
- consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
- } catch (PulsarClientException pulsarClientException) {
- LOG.error("Consumer reconsumeLater has exception "
- + "topic {}, subName {}, ex {}",
- topic,
- pulsarConfig.getPulsarConsumerSubName(),
- pulsarClientException);
- }
- } else {
- consumer.negativeAcknowledge(msg);
- }
- }
- }
- })
- .subscribe();
- } catch (PulsarClientException e) {
- LOG.error("createConsumer has topic {}, subName {}, err {}", topic,
- pulsarConfig.getPulsarConsumerSubName(), e);
- }
+ if (storeConfig.isElasticsearchStore()) {
+ esService.startTimerRoutine();
}
- return consumer;
- }
- protected void handleMessage(Message<byte[]> msg) throws Exception {
- String body = new String(msg.getData(), StandardCharsets.UTF_8);
- AuditData msgBody = gson.fromJson(body, AuditData.class);
- if (storeConfig.isMysqlStore()) {
- AuditDataPo po = new AuditDataPo();
- po.setIp(msgBody.getIp());
- po.setThreadId(msgBody.getThreadId());
- po.setDockerId(msgBody.getDockerId());
- po.setPacketId(msgBody.getPacketId());
- po.setSdkTs(new Date(msgBody.getSdkTs()));
- po.setLogTs(new Date(msgBody.getLogTs()));
- po.setAuditId(msgBody.getAuditId());
- po.setCount(msgBody.getCount());
- po.setDelay(msgBody.getDelay());
- po.setInlongGroupId(msgBody.getInlongGroupId());
- po.setInlongStreamId(msgBody.getInlongStreamId());
- po.setSize(msgBody.getSize());
- auditDataDao.insert(po);
- }
- if (storeConfig.isElasticsearchStore()) {
- ESDataPo esPo = new ESDataPo();
- esPo.setIp(msgBody.getIp());
- esPo.setThreadId(msgBody.getThreadId());
- esPo.setDockerId(msgBody.getDockerId());
- esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
- esPo.setLogTs(new Date(msgBody.getLogTs()));
- esPo.setAuditId(msgBody.getAuditId());
- esPo.setCount(msgBody.getCount());
- esPo.setDelay(msgBody.getDelay());
- esPo.setInlongGroupId(msgBody.getInlongGroupId());
- esPo.setInlongStreamId(msgBody.getInlongStreamId());
- esPo.setSize(msgBody.getSize());
- esPo.setPacketId(msgBody.getPacketId());
- esService.insertData(esPo);
+ if (mqConsume != null) {
+ mqConsume.start();
+ } else {
+ LOG.error("fail to auditMsgConsumerServer");
}
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
new file mode 100644
index 0000000..f73226b
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import com.google.gson.Gson;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.db.entities.AuditDataPo;
+import org.apache.inlong.audit.db.entities.ESDataPo;
+import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.service.ElasticsearchService;
+
+import java.util.Date;
+
+public abstract class BaseConsume {
+
+ private final Gson gson = new Gson();
+
+ protected AuditDataDao auditDataDao;
+ protected ElasticsearchService esService;
+ protected StoreConfig storeConfig;
+ protected MessageQueueConfig mqConfig;
+
+ public BaseConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+ MessageQueueConfig mqConfig) {
+ this.auditDataDao = auditDataDao;
+ this.esService = esService;
+ this.storeConfig = storeConfig;
+ this.mqConfig = mqConfig;
+ }
+
+ public abstract void start();
+
+ protected void handleMessage(String body) throws Exception {
+ AuditData msgBody = gson.fromJson(body, AuditData.class);
+ if (storeConfig.isMysqlStore()) {
+ AuditDataPo po = new AuditDataPo();
+ po.setIp(msgBody.getIp());
+ po.setThreadId(msgBody.getThreadId());
+ po.setDockerId(msgBody.getDockerId());
+ po.setPacketId(msgBody.getPacketId());
+ po.setSdkTs(new Date(msgBody.getSdkTs()));
+ po.setLogTs(new Date(msgBody.getLogTs()));
+ po.setAuditId(msgBody.getAuditId());
+ po.setCount(msgBody.getCount());
+ po.setDelay(msgBody.getDelay());
+ po.setInlongGroupId(msgBody.getInlongGroupId());
+ po.setInlongStreamId(msgBody.getInlongStreamId());
+ po.setSize(msgBody.getSize());
+ auditDataDao.insert(po);
+ }
+ if (storeConfig.isElasticsearchStore()) {
+ ESDataPo esPo = new ESDataPo();
+ esPo.setIp(msgBody.getIp());
+ esPo.setThreadId(msgBody.getThreadId());
+ esPo.setDockerId(msgBody.getDockerId());
+ esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
+ esPo.setLogTs(new Date(msgBody.getLogTs()));
+ esPo.setAuditId(msgBody.getAuditId());
+ esPo.setCount(msgBody.getCount());
+ esPo.setDelay(msgBody.getDelay());
+ esPo.setInlongGroupId(msgBody.getInlongGroupId());
+ esPo.setInlongStreamId(msgBody.getInlongStreamId());
+ esPo.setSize(msgBody.getSize());
+ esPo.setPacketId(msgBody.getPacketId());
+ esService.insertData(esPo);
+ }
+ }
+
+}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
similarity index 57%
copy from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
copy to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
index a98054b..90e91ec 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.service.consume;
-import com.google.gson.Gson;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
-import org.apache.inlong.audit.config.PulsarConfig;
+import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.db.dao.AuditDataDao;
-import org.apache.inlong.audit.db.entities.AuditDataPo;
-import org.apache.inlong.audit.db.entities.ESDataPo;
-import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.service.ElasticsearchService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
@@ -33,38 +31,32 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-@Service
-public class AuditMsgConsumerServer implements InitializingBean {
+public class PulsarConsume extends BaseConsume {
- private static final Logger LOG = LoggerFactory.getLogger(AuditMsgConsumerServer.class);
- private final Gson gson = new Gson();
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarConsume.class);
private final ConcurrentHashMap<String, List<Consumer<byte[]>>> topicConsumerMap = new ConcurrentHashMap<>();
- @Autowired
- private PulsarConfig pulsarConfig;
- @Autowired
- private AuditDataDao auditDataDao;
- @Autowired
- private ElasticsearchService esService;
- @Autowired
- private StoreConfig storeConfig;
+ public PulsarConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+ MessageQueueConfig mqConfig) {
+ super(auditDataDao, esService, storeConfig, mqConfig);
+ }
- public void afterPropertiesSet() {
- PulsarClient pulsarClient = getOrCreatePulsarClient(pulsarConfig.getPulsarServerUrl());
- if (storeConfig.isElasticsearchStore()) {
- esService.startTimerRoutine();
- }
+ @Override
+ public void start() {
+ String pulsarUrl = mqConfig.getPulsarServerUrl();
+ Preconditions.checkArgument(StringUtils.isNotEmpty(pulsarUrl), "no pulsar server url specified");
+ Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getPulsarTopic()),
+ "no pulsar topic specified");
+ Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getPulsarConsumerSubName()),
+ "no pulsar consumeSubName specified");
+ PulsarClient pulsarClient = getOrCreatePulsarClient(pulsarUrl);
updateConcurrentConsumer(pulsarClient);
}
@@ -73,7 +65,7 @@ public class AuditMsgConsumerServer implements InitializingBean {
PulsarClient pulsarClient = null;
try {
pulsarClient = PulsarClient.builder().serviceUrl(pulsarServerUrl)
- .connectionTimeout(pulsarConfig.getClientOperationTimeoutSecond(),
+ .connectionTimeout(mqConfig.getClientOperationTimeoutSecond(),
TimeUnit.SECONDS).build();
} catch (PulsarClientException e) {
LOG.error("getOrCreatePulsarClient has pulsar {} err {}", pulsarServerUrl, e);
@@ -83,16 +75,16 @@ public class AuditMsgConsumerServer implements InitializingBean {
protected void updateConcurrentConsumer(PulsarClient pulsarClient) {
List<Consumer<byte[]>> list =
- topicConsumerMap.computeIfAbsent(pulsarConfig.getPulsarTopic(),
+ topicConsumerMap.computeIfAbsent(mqConfig.getPulsarTopic(),
key -> new ArrayList<Consumer<byte[]>>());
int currentConsumerNum = list.size();
- int createNum = pulsarConfig.getConcurrentConsumerNum() - currentConsumerNum;
+ int createNum = mqConfig.getConcurrentConsumerNum() - currentConsumerNum;
/*
* add consumer
*/
if (createNum > 0) {
- for (int i = 0; i < pulsarConfig.getConcurrentConsumerNum(); i++) {
- Consumer<byte[]> consumer = createConsumer(pulsarClient, pulsarConfig.getPulsarTopic());
+ for (int i = 0; i < mqConfig.getConcurrentConsumerNum(); i++) {
+ Consumer<byte[]> consumer = createConsumer(pulsarClient, mqConfig.getPulsarTopic());
if (consumer != null) {
list.add(consumer);
}
@@ -114,32 +106,33 @@ public class AuditMsgConsumerServer implements InitializingBean {
Consumer<byte[]> consumer = null;
if (pulsarClient != null && StringUtils.isNotEmpty(topic)) {
LOG.info("createConsumer has topic {}, subName {}", topic,
- pulsarConfig.getPulsarConsumerSubName());
+ mqConfig.getPulsarConsumerSubName());
try {
consumer = pulsarClient.newConsumer()
- .subscriptionName(pulsarConfig.getPulsarConsumerSubName())
+ .subscriptionName(mqConfig.getPulsarConsumerSubName())
.subscriptionType(SubscriptionType.Shared)
.topic(topic)
- .receiverQueueSize(pulsarConfig.getConsumerReceiveQueueSize())
- .enableRetry(pulsarConfig.isPulsarConsumerEnableRetry())
+ .receiverQueueSize(mqConfig.getConsumerReceiveQueueSize())
+ .enableRetry(mqConfig.isPulsarConsumerEnableRetry())
.messageListener(new MessageListener<byte[]>() {
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
try {
- handleMessage(msg);
+ String body = new String(msg.getData(), StandardCharsets.UTF_8);
+ handleMessage(body);
consumer.acknowledge(msg);
} catch (Exception e) {
LOG.error("Consumer has exception topic {}, subName {}, ex {}",
topic,
- pulsarConfig.getPulsarConsumerSubName(),
+ mqConfig.getPulsarConsumerSubName(),
e);
- if (pulsarConfig.isPulsarConsumerEnableRetry()) {
+ if (mqConfig.isPulsarConsumerEnableRetry()) {
try {
consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS);
} catch (PulsarClientException pulsarClientException) {
LOG.error("Consumer reconsumeLater has exception "
+ "topic {}, subName {}, ex {}",
topic,
- pulsarConfig.getPulsarConsumerSubName(),
+ mqConfig.getPulsarConsumerSubName(),
pulsarClientException);
}
} else {
@@ -151,47 +144,9 @@ public class AuditMsgConsumerServer implements InitializingBean {
.subscribe();
} catch (PulsarClientException e) {
LOG.error("createConsumer has topic {}, subName {}, err {}", topic,
- pulsarConfig.getPulsarConsumerSubName(), e);
+ mqConfig.getPulsarConsumerSubName(), e);
}
}
return consumer;
}
-
- protected void handleMessage(Message<byte[]> msg) throws Exception {
- String body = new String(msg.getData(), StandardCharsets.UTF_8);
- AuditData msgBody = gson.fromJson(body, AuditData.class);
- if (storeConfig.isMysqlStore()) {
- AuditDataPo po = new AuditDataPo();
- po.setIp(msgBody.getIp());
- po.setThreadId(msgBody.getThreadId());
- po.setDockerId(msgBody.getDockerId());
- po.setPacketId(msgBody.getPacketId());
- po.setSdkTs(new Date(msgBody.getSdkTs()));
- po.setLogTs(new Date(msgBody.getLogTs()));
- po.setAuditId(msgBody.getAuditId());
- po.setCount(msgBody.getCount());
- po.setDelay(msgBody.getDelay());
- po.setInlongGroupId(msgBody.getInlongGroupId());
- po.setInlongStreamId(msgBody.getInlongStreamId());
- po.setSize(msgBody.getSize());
- auditDataDao.insert(po);
- }
- if (storeConfig.isElasticsearchStore()) {
- ESDataPo esPo = new ESDataPo();
- esPo.setIp(msgBody.getIp());
- esPo.setThreadId(msgBody.getThreadId());
- esPo.setDockerId(msgBody.getDockerId());
- esPo.setSdkTs(new Date(msgBody.getSdkTs()).getTime());
- esPo.setLogTs(new Date(msgBody.getLogTs()));
- esPo.setAuditId(msgBody.getAuditId());
- esPo.setCount(msgBody.getCount());
- esPo.setDelay(msgBody.getDelay());
- esPo.setInlongGroupId(msgBody.getInlongGroupId());
- esPo.setInlongStreamId(msgBody.getInlongStreamId());
- esPo.setSize(msgBody.getSize());
- esPo.setPacketId(msgBody.getPacketId());
- esService.insertData(esPo);
- }
- }
-
}
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
new file mode 100644
index 0000000..42c0c06
--- /dev/null
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class TubeConsume extends BaseConsume {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TubeConsume.class);
+ private PullMessageConsumer pullConsumer;
+ private TubeMultiSessionFactory sessionFactory;
+ private String masterUrl;
+ private String topic;
+ private int fetchThreadCnt = 4;
+
+ public TubeConsume(AuditDataDao auditDataDao, ElasticsearchService esService, StoreConfig storeConfig,
+ MessageQueueConfig mqConfig) {
+ super(auditDataDao, esService, storeConfig, mqConfig);
+ }
+
+ @Override
+ public void start() {
+ masterUrl = mqConfig.getTubeMasterList();
+ Preconditions.checkArgument(StringUtils.isNotEmpty(masterUrl), "no tube masterUrlList specified");
+ topic = mqConfig.getTubeTopic();
+ Preconditions.checkArgument(StringUtils.isNotEmpty(topic), "no tube topic specified");
+ fetchThreadCnt = mqConfig.getTubeThreadNum();
+ Preconditions.checkArgument(StringUtils.isNotEmpty(mqConfig.getTubeConsumerGroupName()),
+ "no tube consumer groupName specified");
+
+ initConsumer();
+
+ Thread[] fetchRunners = new Thread[fetchThreadCnt];
+ for (int i = 0; i < fetchThreadCnt; i++) {
+ fetchRunners[i] = new Thread(new Fetcher(pullConsumer, topic), "TubeConsume_Fetcher_Thread_" + i);
+ fetchRunners[i].start();
+ }
+ }
+
+ private void initConsumer() {
+ LOG.info("init tube consumer, topic:{}, masterList:{}", topic, masterUrl);
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterUrl, mqConfig.getTubeConsumerGroupName());
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ try {
+ sessionFactory = new TubeMultiSessionFactory(consumerConfig);
+ pullConsumer = sessionFactory.createPullConsumer(consumerConfig);
+ pullConsumer.subscribe(topic, null);
+ pullConsumer.completeSubscribe();
+ } catch (TubeClientException e) {
+ LOG.error("init tube consumer error {}", e.getMessage());
+ }
+
+ }
+
+ public class Fetcher implements Runnable {
+
+ private final PullMessageConsumer pullMessageConsumer;
+ private String topic;
+
+ public Fetcher(PullMessageConsumer pullMessageConsumer, String topic) {
+ this.pullMessageConsumer = pullMessageConsumer;
+ this.topic = topic;
+ }
+
+ @Override
+ public void run() {
+ ConsumerResult csmResult;
+
+ // wait partition status ready
+ while (true) {
+ if (pullMessageConsumer.isPartitionsReady(5000) || pullMessageConsumer.isShutdown()) {
+ LOG.warn("tube partition is not ready or consumer is shutdown!");
+ break;
+ }
+ }
+ // consume messages
+ while (true) {
+ if (pullMessageConsumer.isShutdown()) {
+ LOG.warn("consumer is shutdown!");
+ break;
+ }
+
+ try {
+ csmResult = pullMessageConsumer.getMessage();
+ if (csmResult.isSuccess()) {
+ List<Message> messageList = csmResult.getMessageList();
+ if (CollectionUtils.isNotEmpty(messageList)) {
+ for (Message message : messageList) {
+ if (StringUtils.equals(message.getTopic(), topic)) {
+ String body = new String(message.getData(), StandardCharsets.UTF_8);
+ handleMessage(body);
+ }
+ }
+ }
+ pullMessageConsumer.confirmConsume(csmResult.getConfirmContext(), true);
+ } else {
+ LOG.error("receive messages errorCode is {}, error meddage is {}", csmResult.getErrCode(),
+ csmResult.getErrMsg());
+ }
+ } catch (TubeClientException e) {
+ LOG.error("tube consumer getMessage error {}", e.getMessage());
+ } catch (Exception e) {
+ LOG.error("handle audit message error {}", e.getMessage());
+ }
+
+ }
+ }
+ }
+}
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/AuditStoreTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
similarity index 87%
rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/AuditStoreTest.java
rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
index e75ae33..33e55f7 100644
--- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/AuditStoreTest.java
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service;
+package org.apache.inlong.audit.service.consume;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -31,10 +31,10 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class AuditStoreTest {
+public class PulsarConsumeTest {
private PulsarClientImpl client;
- private AuditMsgConsumerServer auditMsgConsumerServer;
+ private PulsarConsume pulsarConsume;
static <T> PulsarClientImpl createPulsarClientMock() {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class, Mockito.RETURNS_DEEP_STUBS);
@@ -56,14 +56,14 @@ public class AuditStoreTest {
clientConf.setStatsIntervalSeconds(0);
Consumer<byte[]> consumer = client.newConsumer().subscribe();
- auditMsgConsumerServer = mock(AuditMsgConsumerServer.class);
- when(auditMsgConsumerServer.createConsumer(any(), any())).thenReturn(consumer);
+ pulsarConsume = mock(PulsarConsume.class);
+ when(pulsarConsume.createConsumer(any(), any())).thenReturn(consumer);
}
@Test
public void testConsumer() {
String topic = "non-persistent://public/default/audit-test";
- auditMsgConsumerServer.createConsumer(client, topic);
+ pulsarConsume.createConsumer(client, topic);
}
}
diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
new file mode 100644
index 0000000..22784cc
--- /dev/null
+++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.consume;
+
+import org.apache.inlong.audit.config.MessageQueueConfig;
+import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.service.ElasticsearchService;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TubeConsumeTest {
+
+ private PullMessageConsumer pullMessageConsumer;
+ private AuditDataDao auditDataDao;
+ private ElasticsearchService esService;
+ private StoreConfig storeConfig;
+ private MessageQueueConfig mqConfig;
+ private String topic = "inlong-audit";
+ private ConsumerResult consumerResult;
+
+ @Before
+ public void setUp() throws TubeClientException {
+ pullMessageConsumer = mock(PullMessageConsumer.class);
+ consumerResult = mock(ConsumerResult.class);
+ pullMessageConsumer.subscribe(topic, null);
+ pullMessageConsumer.completeSubscribe();
+ when(pullMessageConsumer.isPartitionsReady(5000)).thenReturn(true);
+ when(pullMessageConsumer.isShutdown()).thenReturn(true);
+ when(pullMessageConsumer.getMessage()).thenReturn(consumerResult);
+ when(consumerResult.isSuccess()).thenReturn(false);
+ }
+
+ @Test
+ public void testConsume() throws InterruptedException {
+ Thread consumeFetch = new Thread(new TubeConsume(auditDataDao, esService, storeConfig, mqConfig).new Fetcher(
+ pullMessageConsumer, topic), "fetch thread");
+ consumeFetch.start();
+ consumeFetch.interrupt();
+ }
+}
diff --git a/inlong-audit/audit-store/src/test/resources/application-test.properties b/inlong-audit/audit-store/src/test/resources/application-test.properties
index aaf5796..8fcde8c 100644
--- a/inlong-audit/audit-store/src/test/resources/application-test.properties
+++ b/inlong-audit/audit-store/src/test/resources/application-test.properties
@@ -16,16 +16,13 @@
# specific language governing permissions and limitations
# under the License.
#
-
# datasource config, set org.postgresql.Driver if using PostgreSQL
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.druid.driver-class-name=org.h2.Driver
spring.datasource.schema=classpath:sql/apache_inlong_audit.sql
-
spring.datasource.druid.url=jdbc:h2:mem:test;MODE=MYSQL;DB_CLOSE_DELAY=-1;IGNORECASE=TRUE;
spring.datasource.druid.username=root
spring.datasource.druid.password=""
-
spring.datasource.druid.filters=stat,log4j,config
spring.datasource.druid.max-active=100
spring.datasource.druid.initial-size=1
@@ -49,14 +46,23 @@ mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
# audit config
audit.config.file.check.enable=false
audit.config.manager.server.url=http://127.0.0.1:8000
+
# store.server: elasticsearch / mysql
audit.config.store.mode=elasticsearch
+# proxy.type: pulsar / tube
+audit.config.proxy.type=pulsar
+
# pulsar config
audit.pulsar.server.url=pulsar://127.0.0.1:6650
audit.pulsar.topic=persistent://public/default/inlong-audit
audit.pulsar.consumer.sub.name=inlong-audit-subscription
+# tube config
+audit.tube.masterlist=127.0.0.1:8080
+audit.tube.topic=inlong-audit
+audit.tube.consumer.group.name=inlong-audit-consumer
+
# es config
elasticsearch.host=127.0.0.1
elasticsearch.port=9200
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index 45830f5..09fe423 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -16,14 +16,13 @@
# specific language governing permissions and limitations
# under the License.
#
-
# datasource config
# datasource config, set org.postgresql.Driver if using PostgreSQL
-spring.datasource.driver-class-name = com.mysql.jdbc.Driver
-spring.datasource.name = druidDataSource
-spring.datasource.type = com.alibaba.druid.pool.DruidDataSource
-spring.datasource.druid.driver-class-name= com.mysql.cj.jdbc.Driver
-spring.datasource.druid.url= jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
+spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+spring.datasource.name=druidDataSource
+spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
+spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
+spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
spring.datasource.druid.username=root
spring.datasource.druid.password=inlong
spring.datasource.druid.filters=stat,log4j,config
@@ -46,6 +45,9 @@ spring.datasource.druid.max-pool-prepared-statement-per-connection-size=20
mybatis.mapper-locations=classpath*:mapper/*.xml
mybatis.type-aliases-package=org.apache.inlong.audit.db.entities
+# proxy.type: pulsar / tube
+audit.config.proxy.type=pulsar
+
# store.server: elasticsearch / mysql
audit.config.store.mode=mysql
@@ -54,6 +56,11 @@ audit.pulsar.server.url=pulsar://127.0.0.1:6650
audit.pulsar.topic=persistent://public/default/inlong-audit
audit.pulsar.consumer.sub.name=inlong-audit-subscription
+# tube config
+audit.tube.masterlist=127.0.0.1:8000
+audit.tube.topic=inlong-audit
+audit.tube.consumer.group.name=inlong-audit-consumer
+
# es config
elasticsearch.host=127.0.0.1
elasticsearch.port=9200