You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/21 03:01:28 UTC

[GitHub] [incubator-inlong] pocozh commented on a change in pull request #3236: [INLONG-3158][Audit] Proxy support TubeMQ

pocozh commented on a change in pull request #3236:
URL: https://github.com/apache/incubator-inlong/pull/3236#discussion_r830725474



##########
File path: inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
##########
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.consts.ConfigConstants;
+import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.audit.utils.NetworkUtils;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TubeSink extends AbstractSink implements Configurable {
+
+    private static final Logger logger = LoggerFactory.getLogger(TubeSink.class);
+
+    private static final String SINK_THREAD_NUM = "thread-num";
+
+    private static final int defaultRetryCnt = -1;
+
+    private static final int defaultLogEveryNEvents = 100000;
+
+    private static final int defaultSendTimeout = 20000; // in millsec
+
+    private static final Long PRINT_INTERVAL = 30L;
+
+    private static final TubePerformanceTask tubePerformanceTask = new TubePerformanceTask();
+
+    private static final int BAD_EVENT_QUEUE_SIZE = 10000;
+
+    private static final int EVENT_QUEUE_SIZE = 1000;
+
+    private static final String MASTER_HOST_PORT_LIST = "master-host-port-list";
+
+    private static final String TOPIC = "topic";
+
+    private static final String SEND_TIMEOUT = "send_timeout"; // in millsec
+
+    private static final String LOG_EVERY_N_EVENTS = "log-every-n-events";
+
+    private static final String RETRY_CNT = "retry-currentSuccSendedCnt";
+
+    private static int retryCnt = defaultRetryCnt;
+
+    private static AtomicLong totalTubeSuccSendCnt = new AtomicLong(0);
+
+    private static AtomicLong totalTubeSuccSendSize = new AtomicLong(0);
+
+    private static ConcurrentHashMap<String, Long> illegalTopicMap =
+            new ConcurrentHashMap<String, Long>();
+
+    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1,
+            new HighPriorityThreadFactory("tubePerformance-Printer-thread"));
+
+    static {
+        /*
+         * stat tube performance
+         */
+        System.out.println("tubePerformanceTask!!!!!!");
+        scheduledExecutorService.scheduleWithFixedDelay(tubePerformanceTask, 0L,
+                PRINT_INTERVAL, TimeUnit.SECONDS);
+    }
+
+    public MessageProducer producer;
+    public Map<String, MessageProducer> producerMap;
+    public TubeMultiSessionFactory sessionFactory;
+    private SinkCounter sinkCounter;
+    private String topic;
+    private volatile boolean canTake = false;
+    private volatile boolean canSend = false;
+    private LinkedBlockingQueue<EventStat> resendQueue;
+    private LinkedBlockingQueue<Event> eventQueue;
+    private long diskIORatePerSec;
+    private RateLimiter diskRateLimiter;
+    private String masterHostAndPortList;
+    private Integer logEveryNEvents;
+    private Integer sendTimeout;
+    private int threadNum;
+    private Thread[] sinkThreadPool;
+    private long linkMaxAllowedDelayedMsgCount;
+    private long sessionWarnDelayedMsgCount;
+    private long sessionMaxAllowedDelayedMsgCount;
+    private long nettyWriteBufferHighWaterMark;
+    private int recoverthreadcount;
+    private boolean overflow = false;
+    /*
+     * for stat
+     */
+    private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+    private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+    private long t1 = System.currentTimeMillis();
+    private long t2 = 0L;
+
+    private String localIp = "127.0.0.1";
+
+    public TubeSink() {
+        super();
+        logger.debug("new instance of TubeSink!");
+    }
+
+    @Override
+    public synchronized void start() {
+        logger.info("tube sink starting...");
+        try {
+            createConnection();
+        } catch (FlumeException e) {
+            logger.error("Unable to create tube client" + ". Exception follows.", e);
+
+            /* Try to prevent leaking resources. */
+            stop();
+            return;
+        }
+
+        sinkCounter.start();
+        super.start();
+        this.canSend = true;
+        this.canTake = true;
+
+        try {
+            initTopicProducer(topic);
+        } catch (Exception e) {
+            logger.info("tubesink start publish topic fail.", e);
+        }
+
+        for (int i = 0; i < sinkThreadPool.length; i++) {
+            sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
+            sinkThreadPool[i].start();
+        }
+        logger.debug("tubesink started");
+
+    }
+
+    @Override
+    public synchronized void stop() {
+        logger.info("tubesink stopping");
+        destroyConnection();
+        this.canTake = false;
+        int waitCount = 0;
+        while (eventQueue.size() != 0 && waitCount++ < 10) {
+            try {
+                Thread.currentThread().sleep(1000);
+            } catch (InterruptedException e) {
+                logger.info("Stop thread has been interrupt!");
+                break;
+            }
+        }
+        this.canSend = false;
+
+        if (sinkThreadPool != null) {
+            for (Thread thread : sinkThreadPool) {
+                if (thread != null) {
+                    thread.interrupt();
+                }
+            }
+            sinkThreadPool = null;
+        }
+
+        super.stop();
+        if (!scheduledExecutorService.isShutdown()) {
+            scheduledExecutorService.shutdown();
+        }
+        sinkCounter.stop();
+        logger.debug("tubesink stopped. Metrics:{}", sinkCounter);
+    }
+
+    @Override
+    public Status process() throws EventDeliveryException {
+        logger.debug("process......");
+        if (!this.canTake) {
+            return Status.BACKOFF;
+        }
+        Status status = Status.READY;
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event != null) {
+                if (diskRateLimiter != null) {
+                    diskRateLimiter.acquire(event.getBody().length);
+                }
+                if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
+                    logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
+                            + "--> Tube,Check if Tube server or network is ok.(if this situation last long time "
+                            + "it will cause memoryChannel full and fileChannel write.)", getName());
+                    tx.rollback();
+                } else {
+                    tx.commit();
+                }
+            } else {
+
+                // logger.info("[{}]No data to process in the channel.",getName());
+                status = Status.BACKOFF;
+                tx.commit();
+            }
+        } catch (Throwable t) {
+            logger.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                logger.error("tubesink transaction rollback exception", e);
+
+            }
+        } finally {
+            tx.close();
+        }
+        return status;
+    }
+
+    @Override
+    public void configure(Context context) {
+        logger.info("Tubesink started and context = {}", context.toString());
+
+        topic = context.getString(TOPIC);
+        Preconditions.checkState(StringUtils.isNotEmpty(topic), "No topic specified");
+
+        masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
+        Preconditions.checkState(masterHostAndPortList != null,
+                "No master and port list specified");
+
+        producerMap = new HashMap<String, MessageProducer>();
+
+        logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, defaultLogEveryNEvents);
+        logger.debug(this.getName() + " " + LOG_EVERY_N_EVENTS + " " + logEveryNEvents);
+        Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
+
+        sendTimeout = context.getInteger(SEND_TIMEOUT, defaultSendTimeout);
+        logger.debug(this.getName() + " " + SEND_TIMEOUT + " " + sendTimeout);
+        Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
+
+        retryCnt = context.getInteger(RETRY_CNT, defaultRetryCnt);
+        logger.debug(this.getName() + " " + RETRY_CNT + " " + retryCnt);
+
+        localIp = NetworkUtils.getLocalIp();
+
+        if (sinkCounter == null) {
+            sinkCounter = new SinkCounter(getName());
+        }
+
+        resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
+
+        String sinkThreadNum = context.getString(SINK_THREAD_NUM, "4");
+        threadNum = Integer.parseInt(sinkThreadNum);
+        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
+        sinkThreadPool = new Thread[threadNum];
+        eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
+
+        diskIORatePerSec = context.getLong("disk-io-rate-per-sec", 0L);
+        if (diskIORatePerSec != 0) {
+            diskRateLimiter = RateLimiter.create(diskIORatePerSec);
+        }
+
+        linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
+                80000L);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org