You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/21 07:30:53 UTC
[incubator-inlong] branch master updated: [INLONG-3158][Audit] Proxy support TubeMQ (#3236)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b05d930 [INLONG-3158][Audit] Proxy support TubeMQ (#3236)
b05d930 is described below
commit b05d930fc317c669daf068b5ccaa35fea32f12cb
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Mon Mar 21 15:30:50 2022 +0800
[INLONG-3158][Audit] Proxy support TubeMQ (#3236)
---
bin/init-config.sh | 2 +-
.../inlong/audit/consts/ConfigConstants.java | 19 +-
inlong-audit/audit-proxy/pom.xml | 17 +-
.../org/apache/inlong/audit/sink/TubeSink.java | 649 +++++++++++++++++++++
.../org/apache/inlong/audit/sink/TubeSinkTest.java | 85 +++
inlong-audit/bin/proxy-start.sh | 29 +-
.../{audit-proxy.conf => audit-proxy-pulsar.conf} | 0
inlong-audit/conf/audit-proxy-tube.conf | 90 +++
8 files changed, 885 insertions(+), 6 deletions(-)
diff --git a/bin/init-config.sh b/bin/init-config.sh
index c138ab5..2635293 100644
--- a/bin/init-config.sh
+++ b/bin/init-config.sh
@@ -39,7 +39,7 @@ init_inlong_audit() {
if [ $source_type == "pulsar" ]; then
echo "Init audit configuration parameters"
cd $INLONG_HOME/inlong-audit/conf
- sed -i 's#pulsar://.*#'''${pulsar_service_url}'''#g' audit-proxy.conf
+ sed -i 's#pulsar://.*#'''${pulsar_service_url}'''#g' audit-proxy-pulsar.conf
sed -i 's#pulsar://.*#'''${pulsar_service_url}'''#g' application.properties
sed -i 's#jdbc:mysql://.*apache_inlong_audit#'''jdbc:mysql://${spring_datasource_hostname}:${spring_datasource_port}/apache_inlong_audit'''#g' application.properties
sed -i 's/spring.datasource.druid.username=.*/'''spring.datasource.druid.username=${spring_datasource_username}'''/g' application.properties
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
index 9c9f710..4c8b936 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
@@ -46,7 +46,24 @@ public class ConfigConstants {
public static final String TRAFFIC_CLASS = "trafficClass";
public static final String MAX_THREADS = "max-threads";
-
+
public static final int MSG_MAX_LENGTH_BYTES = 20 * 1024 * 1024;
+ public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count";
+
+ public static final String SESSION_WARN_DELAYED_MSG_COUNT = "session_warn_delayed_msg_count";
+
+ public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
+
+ public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
+
+ public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+
+ public static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L;
+
+ public static final long DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT = 2000000L;
+
+ public static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L;
+
+ public static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
}
diff --git a/inlong-audit/audit-proxy/pom.xml b/inlong-audit/audit-proxy/pom.xml
index 4686631..13f35ee 100644
--- a/inlong-audit/audit-proxy/pom.xml
+++ b/inlong-audit/audit-proxy/pom.xml
@@ -18,9 +18,9 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-audit</artifactId>
@@ -78,5 +78,16 @@
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
new file mode 100644
index 0000000..03e18cf
--- /dev/null
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.consts.ConfigConstants;
+import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.audit.utils.NetworkUtils;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TubeSink extends AbstractSink implements Configurable {
+
+ private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
+
+ private static final String SINK_THREAD_NUM = "thread-num";
+
+ private static final int defaultRetryCnt = -1;
+
+ private static final int defaultLogEveryNEvents = 100000;
+
+ private static final int defaultSendTimeout = 20000; // in millsec
+
+ private static final Long PRINT_INTERVAL = 30L;
+
+ private static final TubePerformanceTask tubePerformanceTask = new TubePerformanceTask();
+
+ private static final int BAD_EVENT_QUEUE_SIZE = 10000;
+
+ private static final int EVENT_QUEUE_SIZE = 1000;
+
+ private static final String MASTER_HOST_PORT_LIST = "master-host-port-list";
+
+ private static final String TOPIC = "topic";
+
+ private static final String SEND_TIMEOUT = "send_timeout"; // in millsec
+
+ private static final String LOG_EVERY_N_EVENTS = "log-every-n-events";
+
+ private static final String RETRY_CNT = "retry-currentSuccSendedCnt";
+
+ private static int retryCnt = defaultRetryCnt;
+
+ private static AtomicLong totalTubeSuccSendCnt = new AtomicLong(0);
+
+ private static AtomicLong totalTubeSuccSendSize = new AtomicLong(0);
+
+ private static ConcurrentHashMap<String, Long> illegalTopicMap =
+ new ConcurrentHashMap<String, Long>();
+
+ private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1,
+ new HighPriorityThreadFactory("tubePerformance-Printer-thread"));
+
+ static {
+ /*
+ * stat tube performance
+ */
+ logger.info("tubePerformanceTask!!!!!!");
+ scheduledExecutorService.scheduleWithFixedDelay(tubePerformanceTask, 0L,
+ PRINT_INTERVAL, TimeUnit.SECONDS);
+ }
+
+ public MessageProducer producer;
+ public Map<String, MessageProducer> producerMap;
+ public TubeMultiSessionFactory sessionFactory;
+ private SinkCounter sinkCounter;
+ private String topic;
+ private volatile boolean canTake = false;
+ private volatile boolean canSend = false;
+ private LinkedBlockingQueue<EventStat> resendQueue;
+ private LinkedBlockingQueue<Event> eventQueue;
+ private long diskIORatePerSec;
+ private RateLimiter diskRateLimiter;
+ private String masterHostAndPortList;
+ private Integer logEveryNEvents;
+ private Integer sendTimeout;
+ private int threadNum;
+ private Thread[] sinkThreadPool;
+ private long linkMaxAllowedDelayedMsgCount;
+ private long sessionWarnDelayedMsgCount;
+ private long sessionMaxAllowedDelayedMsgCount;
+ private long nettyWriteBufferHighWaterMark;
+ private int recoverthreadcount;
+ private boolean overflow = false;
+ /*
+ * for stat
+ */
+ private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+ private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+ private long t1 = System.currentTimeMillis();
+ private long t2 = 0L;
+
+ private String localIp = "127.0.0.1";
+
+ public TubeSink() {
+ super();
+ logger.debug("new instance of TubeSink!");
+ }
+
+ @Override
+ public synchronized void start() {
+ logger.info("tube sink starting...");
+ try {
+ createConnection();
+ } catch (FlumeException e) {
+ logger.error("Unable to create tube client" + ". Exception follows.", e);
+
+ // prevent leaking resources
+ stop();
+ return;
+ }
+
+ sinkCounter.start();
+ super.start();
+ this.canSend = true;
+ this.canTake = true;
+
+ try {
+ initTopicProducer(topic);
+ } catch (Exception e) {
+ logger.error("tubesink start publish topic fail.", e);
+ }
+
+ for (int i = 0; i < sinkThreadPool.length; i++) {
+ sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
+ sinkThreadPool[i].start();
+ }
+ logger.debug("tubesink started");
+
+ }
+
+ @Override
+ public synchronized void stop() {
+ logger.info("tubesink stopping");
+ destroyConnection();
+ this.canTake = false;
+ int waitCount = 0;
+ while (eventQueue.size() != 0 && waitCount++ < 10) {
+ try {
+ Thread.currentThread().sleep(1000);
+ } catch (InterruptedException e) {
+ logger.info("Stop thread has been interrupt!");
+ break;
+ }
+ }
+ this.canSend = false;
+
+ if (sinkThreadPool != null) {
+ for (Thread thread : sinkThreadPool) {
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+ sinkThreadPool = null;
+ }
+
+ super.stop();
+ if (!scheduledExecutorService.isShutdown()) {
+ scheduledExecutorService.shutdown();
+ }
+ sinkCounter.stop();
+ logger.debug("tubesink stopped. Metrics:{}", sinkCounter);
+ }
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ logger.debug("process......");
+ if (!this.canTake) {
+ return Status.BACKOFF;
+ }
+ Status status = Status.READY;
+ Channel channel = getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ try {
+ Event event = channel.take();
+ if (event != null) {
+ if (diskRateLimiter != null) {
+ diskRateLimiter.acquire(event.getBody().length);
+ }
+ if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
+ logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
+ + "--> Tube,Check if Tube server or network is ok.(if this situation last long time "
+ + "it will cause memoryChannel full and fileChannel write.)", getName());
+ tx.rollback();
+ } else {
+ tx.commit();
+ }
+ } else {
+ status = Status.BACKOFF;
+ tx.commit();
+ }
+ } catch (Throwable t) {
+ logger.error("Process event failed!" + this.getName(), t);
+ try {
+ tx.rollback();
+ } catch (Throwable e) {
+ logger.error("tubesink transaction rollback exception", e);
+
+ }
+ } finally {
+ tx.close();
+ }
+ return status;
+ }
+
+ @Override
+ public void configure(Context context) {
+ logger.info("Tubesink started and context = {}", context.toString());
+
+ topic = context.getString(TOPIC);
+ Preconditions.checkState(StringUtils.isNotEmpty(topic), "No topic specified");
+
+ masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
+ Preconditions.checkState(masterHostAndPortList != null,
+ "No master and port list specified");
+
+ producerMap = new HashMap<String, MessageProducer>();
+
+ logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, defaultLogEveryNEvents);
+ logger.debug(this.getName() + " " + LOG_EVERY_N_EVENTS + " " + logEveryNEvents);
+ Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
+
+ sendTimeout = context.getInteger(SEND_TIMEOUT, defaultSendTimeout);
+ logger.debug(this.getName() + " " + SEND_TIMEOUT + " " + sendTimeout);
+ Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
+
+ retryCnt = context.getInteger(RETRY_CNT, defaultRetryCnt);
+ logger.debug(this.getName() + " " + RETRY_CNT + " " + retryCnt);
+
+ localIp = NetworkUtils.getLocalIp();
+
+ if (sinkCounter == null) {
+ sinkCounter = new SinkCounter(getName());
+ }
+
+ resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
+
+ String sinkThreadNum = context.getString(SINK_THREAD_NUM, "4");
+ threadNum = Integer.parseInt(sinkThreadNum);
+ Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
+ sinkThreadPool = new Thread[threadNum];
+ eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
+
+ diskIORatePerSec = context.getLong("disk-io-rate-per-sec", 0L);
+ if (diskIORatePerSec != 0) {
+ diskRateLimiter = RateLimiter.create(diskIORatePerSec);
+ }
+
+ linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
+ ConfigConstants.DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT);
+ sessionWarnDelayedMsgCount = context.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT,
+ ConfigConstants.DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT);
+ sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT,
+ ConfigConstants.DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT);
+ nettyWriteBufferHighWaterMark = context.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK,
+ ConfigConstants.DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK);
+ recoverthreadcount = context.getInteger(ConfigConstants.RECOVER_THREAD_COUNT,
+ Runtime.getRuntime().availableProcessors() + 1);
+
+ }
+
+ public void handleMessageSendSuccess(EventStat es) {
+ //Statistics tube performance
+ totalTubeSuccSendCnt.incrementAndGet();
+ totalTubeSuccSendSize.addAndGet(es.getEvent().getBody().length);
+
+ // add to sinkCounter
+ sinkCounter.incrementEventDrainSuccessCount();
+ currentSuccessSendCnt.incrementAndGet();
+ long nowCnt = currentSuccessSendCnt.get();
+ long oldCnt = lastSuccessSendCnt.get();
+ if (nowCnt % logEveryNEvents == 0 && nowCnt != lastSuccessSendCnt.get()) {
+ lastSuccessSendCnt.set(nowCnt);
+ t2 = System.currentTimeMillis();
+ logger.info("tubesink {}, succ put {} events to tube,"
+ + " in the past {} millsec", new Object[]{
+ getName(), (nowCnt - oldCnt), (t2 - t1)
+ });
+ t1 = t2;
+ }
+
+ }
+
+ /**
+ * Resend the data and store the data in the memory cache.
+ *
+ * @param es
+ * @param isDecrement
+ */
+ private void resendEvent(EventStat es, boolean isDecrement) {
+ try {
+ if (es == null || es.getEvent() == null) {
+ return;
+ }
+ if (!resendQueue.offer(es)) {
+ FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent());
+ }
+ } catch (Throwable throwable) {
+ logger.error("resendEvent e = {}", throwable);
+ }
+ }
+
+ /**
+ * If this function is called successively without calling {@see #destroyConnection()}, only the
+ * first call has any effect.
+ *
+ * @throws FlumeException if an RPC client connection could not be opened
+ */
+ private void createConnection() throws FlumeException {
+ // if already connected, just skip
+ if (sessionFactory != null) {
+ return;
+ }
+
+ try {
+ TubeClientConfig conf = initTubeConfig(masterHostAndPortList);
+ sessionFactory = new TubeMultiSessionFactory(conf);
+ logger.info("create tube connection successfully");
+ } catch (TubeClientException e) {
+ logger.error("create connnection error in tubesink, "
+ + "maybe tube master set error, please re-check. ex1 {}", e.getMessage());
+ throw new FlumeException("connect to Tube error1, "
+ + "maybe zkstr/zkroot set error, please re-check");
+ } catch (Throwable e) {
+ logger.error("create connnection error in tubesink, "
+ + "maybe tube master set error/shutdown in progress, please re-check. ex2 {}", e);
+ throw new FlumeException("connect to meta error2, "
+ + "maybe tube master set error/shutdown in progress, please re-check");
+ }
+
+ if (producerMap == null) {
+ producerMap = new HashMap<String, MessageProducer>();
+ }
+ }
+
+ private TubeClientConfig initTubeConfig(String masterHostAndPortList) throws Exception {
+ final TubeClientConfig tubeClientConfig = new TubeClientConfig(masterHostAndPortList);
+ tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
+ tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
+ tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount);
+ tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark);
+ tubeClientConfig.setHeartbeatPeriodMs(15000L);
+ tubeClientConfig.setRpcTimeoutMs(20000L);
+
+ return tubeClientConfig;
+ }
+
+ private void destroyConnection() {
+ for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) {
+ MessageProducer producer = entry.getValue();
+ try {
+ producer.shutdown();
+ } catch (TubeClientException e) {
+ logger.error("destroy producer error in tubesink, MetaClientException {}", e.getMessage());
+ } catch (Throwable e) {
+ logger.error("destroy producer error in tubesink, ex {}", e.getMessage());
+ }
+ }
+ producerMap.clear();
+
+ if (sessionFactory != null) {
+ try {
+ sessionFactory.shutdown();
+ } catch (Exception e) {
+ logger.error("destroy sessionFactory error in tubesink, ex {}", e.getMessage());
+ }
+ }
+ sessionFactory = null;
+ logger.debug("closed meta producer");
+ }
+
+ /**
+ * Currently, all topics are published by the same producer. If needed, extend it to multi producers.
+ *
+ * @param topic
+ * @throws TubeClientException
+ */
+ private void initTopicProducer(String topic) throws TubeClientException {
+ if (StringUtils.isEmpty(topic)) {
+ logger.error("topic is empty");
+ return;
+ }
+ if (sessionFactory == null) {
+ throw new TubeClientException("sessionFactory is null, can't create producer");
+ }
+
+ if (producer == null) {
+ producer = sessionFactory.createProducer();
+ }
+
+ producer.publish(topic);
+ producerMap.put(topic, producer);
+ logger.info(getName() + " success publish topic: " + topic);
+ }
+
+ private MessageProducer getProducer(String topic) throws TubeClientException {
+ if (producerMap.containsKey(topic)) {
+ return producerMap.get(topic);
+ } else {
+ synchronized (this) {
+ if (!producerMap.containsKey(topic)) {
+ if (producer == null) {
+ producer = sessionFactory.createProducer();
+ }
+ // publish topic
+ producer.publish(topic);
+ producerMap.put(topic, producer);
+ }
+ }
+ return producerMap.get(topic);
+ }
+
+ }
+
+ static class TubePerformanceTask implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ if (totalTubeSuccSendSize.get() != 0) {
+ logger.info("Total tube performance tps :"
+ + totalTubeSuccSendCnt.get() / PRINT_INTERVAL
+ + "/s, avg msg size:"
+ + totalTubeSuccSendSize.get() / totalTubeSuccSendCnt.get()
+ + ",print every " + PRINT_INTERVAL + " seconds");
+
+ // totalpulsarSuccSendCnt represents the number of packets
+ totalTubeSuccSendSize.set(0);
+ totalTubeSuccSendCnt.set(0);
+ }
+
+ } catch (Exception e) {
+ logger.info("tubePerformanceTask error", e);
+ }
+ }
+ }
+
+ class SinkTask implements Runnable {
+
+ @Override
+ public void run() {
+ logger.info("Sink task {} started.", Thread.currentThread().getName());
+ while (canSend) {
+ boolean decrementFlag = false;
+ Event event = null;
+ EventStat es = null;
+ String topic = null;
+ try {
+ if (TubeSink.this.overflow) {
+ TubeSink.this.overflow = false;
+ Thread.sleep(10);
+ }
+ if (!resendQueue.isEmpty()) {
+ // Send the data in the retry queue first
+ es = resendQueue.poll();
+ if (es != null) {
+ event = es.getEvent();
+ if (event.getHeaders().containsKey(TOPIC)) {
+ topic = event.getHeaders().get(TOPIC);
+ }
+ }
+ } else {
+ event = eventQueue.take();
+ es = new EventStat(event);
+ if (event.getHeaders().containsKey(TOPIC)) {
+ topic = event.getHeaders().get(TOPIC);
+ }
+ }
+
+ if (event == null) {
+ // ignore event is null, when multiple-thread SinkTask running
+ // this null value comes from resendQueue
+ continue;
+ }
+
+ if (topic == null || topic.equals("")) {
+ logger.warn("no topic specified in event header, just skip this event");
+ continue;
+ }
+
+ Long expireTime = illegalTopicMap.get(topic);
+ if (expireTime != null) {
+ long currentTime = System.currentTimeMillis();
+ if (expireTime > currentTime) {
+ continue;
+ } else {
+
+ illegalTopicMap.remove(topic);
+ }
+ }
+
+ final EventStat eventStat = es;
+ boolean sendResult = sendMessage(event, topic, eventStat);
+ if (!sendResult) {
+ continue;
+ }
+
+ decrementFlag = true;
+
+ } catch (InterruptedException e) {
+ logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
+ return;
+ } catch (Throwable throwable) {
+ if (throwable instanceof TubeClientException) {
+ String message = throwable.getMessage();
+ if (message != null && (message.contains("No available queue for topic")
+ || message.contains("The brokers of topic are all forbidden"))) {
+ illegalTopicMap.put(topic, System.currentTimeMillis() + 60 * 1000);
+ logger.info("IllegalTopicMap.put " + topic);
+ continue;
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ //ignore..
+ }
+ }
+ }
+ resendEvent(es, decrementFlag);
+ }
+ }
+ }
+
+ private boolean sendMessage(Event event, String topic, EventStat es)
+ throws TubeClientException, InterruptedException {
+ MessageProducer producer = getProducer(topic);
+ if (producer == null) {
+ illegalTopicMap.put(topic, System.currentTimeMillis() + 30 * 1000);
+ logger.error("Get producer is null, topic:{}", topic);
+ return false;
+ }
+
+ Message message = new Message(topic, event.getBody());
+ message.setAttrKeyVal("auditIp", localIp);
+ String streamId = "";
+ String groupId = "";
+ if (event.getHeaders().containsKey(org.apache.inlong.audit.consts.AttributeConstants.INLONG_STREAM_ID)) {
+ streamId = event.getHeaders().get(org.apache.inlong.audit.consts.AttributeConstants.INLONG_STREAM_ID);
+ message.setAttrKeyVal(org.apache.inlong.audit.consts.AttributeConstants.INLONG_STREAM_ID, streamId);
+ }
+ if (event.getHeaders().containsKey(org.apache.inlong.audit.consts.AttributeConstants.INLONG_GROUP_ID)) {
+ groupId = event.getHeaders().get(org.apache.inlong.audit.consts.AttributeConstants.INLONG_GROUP_ID);
+ message.setAttrKeyVal(org.apache.inlong.audit.consts.AttributeConstants.INLONG_GROUP_ID, groupId);
+ }
+
+ logger.debug("producer start to send msg...");
+ producer.sendMessage(message, new MyCallback(es));
+
+ illegalTopicMap.remove(topic);
+ return true;
+ }
+ }
+
+ public class MyCallback implements MessageSentCallback {
+
+ private org.apache.inlong.audit.sink.EventStat myEventStat;
+ private long sendTime;
+
+ public MyCallback(org.apache.inlong.audit.sink.EventStat eventStat) {
+ this.myEventStat = eventStat;
+ this.sendTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void onMessageSent(final MessageSentResult result) {
+ if (result.isSuccess()) {
+ handleMessageSendSuccess(myEventStat);
+ return;
+ }
+
+ // handle sent error
+ if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
+ logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
+ result.getErrMsg(), resendQueue.size(),
+ myEventStat.getEvent().hashCode());
+
+ return;
+ }
+ if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
+ logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
+ result.getErrMsg(), resendQueue.size(),
+ myEventStat.getEvent().hashCode());
+ }
+ myEventStat.incRetryCnt();
+ resendEvent(myEventStat, true);
+
+ }
+
+ @Override
+ public void onException(final Throwable e) {
+ Throwable t = e;
+ while (t.getCause() != null) {
+ t = t.getCause();
+ }
+ if (t instanceof OverflowException) {
+ org.apache.inlong.audit.sink.TubeSink.this.overflow = true;
+ }
+ myEventStat.incRetryCnt();
+ resendEvent(myEventStat, true);
+ }
+ }
+
+}
diff --git a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
new file mode 100644
index 0000000..69f601b
--- /dev/null
+++ b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+public class TubeSinkTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TubeSink.class);
+
+ private TubeSink tubeSink;
+ private Channel channel;
+ private Context context;
+
+ @Before
+ public void setUp() {
+ tubeSink = new TubeSink();
+ channel = new MemoryChannel();
+ context = new Context();
+
+ context.put("topic", "inlong-audit");
+ context.put("master-host-port-list", "127.0.0.1:8080");
+
+ tubeSink.setChannel(channel);
+ Configurables.configure(tubeSink, context);
+ Configurables.configure(channel, context);
+
+ }
+
+ @Test
+ public void testProcess() throws InterruptedException, EventDeliveryException {
+ Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+ tubeSink.start();
+ Assert.assertTrue(LifecycleController.waitForOneOf(tubeSink, LifecycleState.START_OR_ERROR, 5000));
+ Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+ for (int i = 0; i < 10; i++) {
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ for (int i = 0; i < 5; i++) {
+ Sink.Status status = tubeSink.process();
+ Assert.assertEquals(Sink.Status.READY, status);
+ }
+
+ tubeSink.stop();
+ Assert.assertTrue(LifecycleController.waitForOneOf(tubeSink,
+ LifecycleState.STOP_OR_ERROR, 5000));
+ }
+
+}
diff --git a/inlong-audit/bin/proxy-start.sh b/inlong-audit/bin/proxy-start.sh
index c8412d3..80a72ae 100644
--- a/inlong-audit/bin/proxy-start.sh
+++ b/inlong-audit/bin/proxy-start.sh
@@ -19,8 +19,35 @@
# under the License.
#
cd "$(dirname "$0")"/../
+
+error() {
+ local msg=$1
+ local exit_code=$2
+
+ echo "Error: $msg" >&2
+
+ if [ -n "$exit_code" ]; then
+ exit $exit_code
+ fi
+}
+
prepare_file=conf/server.properties
if [ ! -f "$prepare_file" ]; then
touch "$prepare_file"
fi
-nohup bin/audit-proxy agent --conf conf/ -f conf/audit-proxy.conf -n agent1 --no-reload-conf > audit-proxy.log 2>&1 &
\ No newline at end of file
+
+MQ_TYPE=pulsar
+if [ -n "$1" ]; then
+ MQ_TYPE=$1
+fi
+
+basedir="$(pwd)"
+CONFIG_FILE="audit-proxy-${MQ_TYPE}.conf"
+CONFIG_FILE_WITH_COFING_PATH="conf/${CONFIG_FILE}"
+CONFIG_FILE_WITH_PATH="${basedir}/conf/${CONFIG_FILE}"
+
+if [ -f "$CONFIG_FILE_WITH_PATH" ]; then
+ nohup bin/audit-proxy agent --conf conf/ -f "${CONFIG_FILE_WITH_COFING_PATH}" -n agent1 --no-reload-conf >audit-proxy.log 2>&1 &
+else
+ error "${CONFIG_FILE_WITH_PATH} is not exist! start failed!" 1
+fi
diff --git a/inlong-audit/conf/audit-proxy.conf b/inlong-audit/conf/audit-proxy-pulsar.conf
similarity index 100%
rename from inlong-audit/conf/audit-proxy.conf
rename to inlong-audit/conf/audit-proxy-pulsar.conf
diff --git a/inlong-audit/conf/audit-proxy-tube.conf b/inlong-audit/conf/audit-proxy-tube.conf
new file mode 100644
index 0000000..84c41db
--- /dev/null
+++ b/inlong-audit/conf/audit-proxy-tube.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# The configuration file needs to define the sources,
+# the channels and the sinks.
+# Sources, channels and sinks are defined per agent,
+# in this case called 'agent'
+
+agent1.sources = tcp-source
+agent1.channels = ch-msg1 ch-msg2
+agent1.sinks = tube-sink-msg1 tube-sink-msg2
+
+agent1.sources.tcp-source.channels = ch-msg1 ch-msg2
+agent1.sources.tcp-source.type = org.apache.inlong.audit.source.SimpleTcpSource
+agent1.sources.tcp-source.msg-factory-name = org.apache.inlong.audit.source.ServerMessageFactory
+agent1.sources.tcp-source.message-handler-name = org.apache.inlong.audit.source.ServerMessageHandler
+agent1.sources.tcp-source.host = 0.0.0.0
+agent1.sources.tcp-source.port = 10081
+agent1.sources.tcp-source.highWaterMark=2621440
+agent1.sources.tcp-source.enableExceptionReturn=true
+agent1.sources.tcp-source.max-msg-length = 524288
+agent1.sources.tcp-source.topic = test_token
+agent1.sources.tcp-source.attr = m=9
+agent1.sources.tcp-source.connections = 5000
+agent1.sources.tcp-source.max-threads = 64
+agent1.sources.tcp-source.receiveBufferSize = 524288
+agent1.sources.tcp-source.sendBufferSize = 524288
+agent1.sources.tcp-source.custom-cp = true
+agent1.sources.tcp-source.selector.type = org.apache.inlong.audit.channel.FailoverChannelSelector
+agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
+agent1.sources.tcp-source.metric-recovery-path=./data/recovery
+agent1.sources.tcp-source.set=10
+
+agent1.channels.ch-msg1.type = memory
+agent1.channels.ch-msg1.capacity = 10000
+agent1.channels.ch-msg1.keep-alive = 0
+agent1.channels.ch-msg1.transactionCapacity = 200
+
+agent1.channels.ch-msg2.type = file
+agent1.channels.ch-msg2.capacity = 100000000
+agent1.channels.ch-msg2.maxFileSize = 1073741824
+agent1.channels.ch-msg2.minimumRequiredSpace = 1073741824
+agent1.channels.ch-msg2.checkpointDir =./data/file/ch-msg2/check
+agent1.channels.ch-msg2.dataDirs =./data/file/ch-msg2/data
+agent1.channels.ch-msg2.fsyncPerTransaction = false
+agent1.channels.ch-msg2.fsyncInterval = 10
+
+agent1.sinks.tube-sink-msg1.channel = ch-msg1
+agent1.sinks.tube-sink-msg1.type = org.apache.inlong.audit.sink.TubeSink
+agent1.sinks.tube-sink-msg1.master-host-port-list = TUBE_LIST
+agent1.sinks.tube-sink-msg1.topic = inlong-audit
+agent1.sinks.tube-sink-msg1.send_timeout = 30000
+agent1.sinks.tube-sink-msg1.stat-interval-sec = 60
+agent1.sinks.tube-sink-msg1.thread-num = 8
+agent1.sinks.tube-sink-msg1.client-id-cache = true
+agent1.sinks.tube-sink-msg1.max-survived-time = 300000
+agent1.sinks.tube-sink-msg1.max-survived-size = 3000000
+agent1.sinks.tube-sink-msg1.new-check-pattern = true
+agent1.sinks.tube-sink-msg1.old-metric-on=true
+agent1.sinks.tube-sink-msg1.set=10
+
+agent1.sinks.tube-sink-msg2.channel = ch-msg2
+agent1.sinks.tube-sink-msg2.type = org.apache.inlong.audit.sink.TubeSink
+agent1.sinks.tube-sink-msg2.master-host-port-list = TUBE_LIST
+agent1.sinks.tube-sink-msg2.topic = inlong-audit
+agent1.sinks.tube-sink-msg2.send_timeout = 30000
+agent1.sinks.tube-sink-msg2.stat-interval-sec = 60
+agent1.sinks.tube-sink-msg2.thread-num = 8
+agent1.sinks.tube-sink-msg2.client-id-cache = true
+agent1.sinks.tube-sink-msg2.max-survived-time = 300000
+agent1.sinks.tube-sink-msg2.max-survived-size = 3000000
+agent1.sinks.tube-sink-msg2.new-check-pattern = true
+agent1.sinks.tube-sink-msg2.old-metric-on=true
+agent1.sinks.tube-sink-msg2.set=10