You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/10 02:42:39 UTC

[incubator-inlong] branch master updated: [INLONG-2076] Tube sink of DataProxy support new Message format. (#2092)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c5c6db4  [INLONG-2076] Tube sink of DataProxy support new Message format. (#2092)
c5c6db4 is described below

commit c5c6db4a4fd5b9cda533e9e0b9bff05e39e3e35f
Author: 卢春亮 <94...@qq.com>
AuthorDate: Mon Jan 10 10:42:35 2022 +0800

    [INLONG-2076] Tube sink of DataProxy support new Message format. (#2092)
---
 .../dataproxy/sink/SimpleMessageTubeSink.java      | 816 +++++++++++++++++++++
 1 file changed, 816 insertions(+)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
new file mode 100644
index 0000000..0a63148
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.source.shaded.guava.RateLimiter;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.dataproxy.config.ConfigManager;
+import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.dataproxy.utils.NetworkUtils;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+public class SimpleMessageTubeSink extends AbstractSink implements Configurable {
+
+    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageTubeSink.class);
+    private static int MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
+    private static final String TUBE_REQUEST_TIMEOUT = "tube-request-timeout";
+    private static final String KEY_DISK_IO_RATE_PER_SEC = "disk-io-rate-per-sec";
+
+    private static int BAD_EVENT_QUEUE_SIZE = 10000;
+
+    private static final String SINK_THREAD_NUM = "thread-num";
+    private static int EVENT_QUEUE_SIZE = 1000;
+    private volatile boolean canTake = false;
+    private volatile boolean canSend = false;
+    private static int BATCH_SIZE = 10000;
+    private static final int defaultRetryCnt = -1;
+    private static final int defaultLogEveryNEvents = 100000;
+    private static final int defaultSendTimeout = 20000; // in millsec
+    private static final int defaultStatIntervalSec = 60;
+    private static final int sendNewMetricRetryCount = 3;
+
+    private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
+    private static String TOPIC = "topic";
+    private static String SEND_TIMEOUT = "send_timeout"; // in millsec
+    private static String LOG_EVERY_N_EVENTS = "log-every-n-events";
+    private static String RETRY_CNT = "retry-currentSuccSendedCnt";
+    private static String STAT_INTERVAL_SEC = "stat-interval-sec"; // in sec
+    private static String MAX_TOPICS_EACH_PRODUCER_HOLD_NAME = "max-topic-each-producer-hold";
+
+    private static final String LOG_TOPIC = "proxy-log-topic";
+    private static final String STREAMID = "proxy-log-streamid";
+    private static final String GROUPID = "proxy-log-groupid";
+    private static final String SEND_REMOTE = "send-remote";
+    private static final String topicsFilePath = "topics.properties";
+    private static final String slaTopicFilePath = "slaTopics.properties";
+    private static final String SLA_METRIC_SINK = "sla-metric-sink";
+
+    private static String MAX_SURVIVED_TIME = "max-survived-time";
+    private static String MAX_SURVIVED_SIZE = "max-survived-size";
+    private static String CLIENT_ID_CACHE = "client-id-cache";
+
+    private int maxSurvivedTime = 3 * 1000 * 30;
+    private int maxSurvivedSize = 100000;
+
+    private String proxyLogTopic = "teg_manager";
+    private String proxyLogGroupId = "b_teg_manager";
+    private String proxyLogStreamId = "proxy_measure_log";
+    private boolean sendRemote = false;
+    private ConfigManager configManager;
+    private Map<String, String> topicProperties;
+
+    public MessageProducer producer;
+    public Map<String, MessageProducer> producerMap;
+
+    private LinkedBlockingQueue<EventStat> resendQueue;
+    private LinkedBlockingQueue<Event> eventQueue;
+
+    private long diskIORatePerSec;
+    private RateLimiter diskRateLimiter;
+
+    public AtomicInteger currentPublishTopicNum = new AtomicInteger(0);
+    public TubeMultiSessionFactory sessionFactory;
+    private String masterHostAndPortList;
+    private Integer logEveryNEvents;
+    private Integer sendTimeout;
+    private static int retryCnt = defaultRetryCnt;
+    private int requestTimeout = 60;
+    private int threadNum;
+    private Thread[] sinkThreadPool;
+
+    private String metaTopicFilePath = topicsFilePath;
+    private long linkMaxAllowedDelayedMsgCount;
+    private long sessionWarnDelayedMsgCount;
+    private long sessionMaxAllowedDelayedMsgCount;
+    private long nettyWriteBufferHighWaterMark;
+    private int recoverthreadcount;
+    //
+    private Map<String, String> dimensions;
+    private DataProxyMetricItemSet metricItemSet;
+
+    private static final LoadingCache<String, Long> agentIdCache = CacheBuilder
+            .newBuilder().concurrencyLevel(4 * 8).initialCapacity(5000000).expireAfterAccess(30, TimeUnit.SECONDS)
+            .build(new CacheLoader<String, Long>() {
+
+                @Override
+                public Long load(String key) {
+                    return System.currentTimeMillis();
+                }
+            });
+
+    private IdCacheCleaner idCacheCleaner;
+    protected static boolean idCleanerStarted = false;
+    protected static final ConcurrentHashMap<String, Long> agentIdMap =
+            new ConcurrentHashMap<String, Long>();
+    private static ConcurrentHashMap<String, Long> illegalTopicMap =
+            new ConcurrentHashMap<String, Long>();
+
+    private boolean clientIdCache = false;
+    private boolean isNewCache = true;
+
+    private boolean overflow = false;
+
+    /**
+     * diff publish
+     *
+     * @param originalSet
+     * @param endSet
+     */
+    public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
+
+        boolean changed = false;
+        for (String s : endSet) {
+            if (!originalSet.contains(s)) {
+                changed = true;
+                try {
+                    producer = getProducer(s);
+                } catch (Exception e) {
+                    logger.error("Get producer failed!", e);
+                }
+            }
+        }
+
+        if (changed) {
+            logger.info("topics.properties has changed, trigger diff publish for {}", getName());
+            topicProperties = configManager.getTopicProperties();
+        }
+    }
+
+    private MessageProducer getProducer(String topic) throws TubeClientException {
+        if (producerMap.containsKey(topic)) {
+            return producerMap.get(topic);
+        } else {
+            synchronized (this) {
+                if (!producerMap.containsKey(topic)) {
+                    if (producer == null || currentPublishTopicNum.get() >= MAX_TOPICS_EACH_PRODUCER_HOLD) {
+                        producer = sessionFactory.createProducer();
+                        currentPublishTopicNum.set(0);
+                    }
+                    // publish topic
+                    producer.publish(topic);
+                    producerMap.put(topic, producer);
+                    currentPublishTopicNum.incrementAndGet();
+                }
+            }
+            return producerMap.get(topic);
+        }
+    }
+
+    private TubeClientConfig initTubeConfig() throws Exception {
+        final TubeClientConfig tubeClientConfig = new TubeClientConfig(NetworkUtils.getLocalIp(),
+                this.masterHostAndPortList);
+        tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
+        tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
+        tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount);
+        tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark);
+        tubeClientConfig.setHeartbeatPeriodMs(15000L);
+        tubeClientConfig.setRpcTimeoutMs(20000L);
+
+        return tubeClientConfig;
+    }
+
+    /**
+     * If this function is called successively without calling {@see #destroyConnection()}, only the
+     * first call has any effect.
+     *
+     * @throws FlumeException if an RPC client connection could not be opened
+     */
+    private void createConnection() throws FlumeException {
+//        synchronized (tubeSessionLock) {
+        // if already connected, just skip
+        if (sessionFactory != null) {
+            return;
+        }
+
+        try {
+            TubeClientConfig conf = initTubeConfig();
+            //sessionFactory = new TubeMutilMessageSessionFactory(conf);
+            sessionFactory = new TubeMultiSessionFactory(conf);
+        } catch (TubeClientException e) {
+            logger.error("create connnection error in metasink, "
+                    + "maybe tube master set error, please re-check. ex1 {}", e.getMessage());
+            throw new FlumeException("connect to Tube error1, "
+                    + "maybe zkstr/zkroot set error, please re-check");
+        } catch (Throwable e) {
+            logger.error("create connnection error in metasink, "
+                            + "maybe tube master set error/shutdown in progress, please re-check. ex2 {}",
+                    e.getMessage());
+            throw new FlumeException("connect to meta error2, "
+                    + "maybe tube master set error/shutdown in progress, please re-check");
+        }
+
+        if (producerMap == null) {
+            producerMap = new HashMap<String, MessageProducer>();
+        }
+        logger.debug("building tube producer");
+//        }
+    }
+
+    private void destroyConnection() {
+        for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) {
+            MessageProducer producer = entry.getValue();
+            try {
+                producer.shutdown();
+            } catch (TubeClientException e) {
+                logger.error("destroy producer error in metasink, MetaClientException {}", e.getMessage());
+            } catch (Throwable e) {
+                logger.error("destroy producer error in metasink, ex {}", e.getMessage());
+            }
+        }
+        producerMap.clear();
+
+        if (sessionFactory != null) {
+            try {
+                sessionFactory.shutdown();
+            } catch (TubeClientException e) {
+                logger.error("destroy sessionFactory error in metasink, MetaClientException {}",
+                        e.getMessage());
+            } catch (Exception e) {
+                logger.error("destroy sessionFactory error in metasink, ex {}", e.getMessage());
+            }
+        }
+        sessionFactory = null;
+        logger.debug("closed meta producer");
+    }
+
+    private void initTopicSet(Set<String> topicSet) throws Exception {
+        List<String> sortedList = new ArrayList(topicSet);
+        Collections.sort(sortedList);
+        int cycle = sortedList.size() / MAX_TOPICS_EACH_PRODUCER_HOLD;
+        int remainder = sortedList.size() % MAX_TOPICS_EACH_PRODUCER_HOLD;
+        long startTime = System.currentTimeMillis();
+        for (int i = 0; i <= cycle; i++) {
+            Set<String> subset = new HashSet<String>();
+            int startIndex = i * MAX_TOPICS_EACH_PRODUCER_HOLD;
+            int endIndex = startIndex + MAX_TOPICS_EACH_PRODUCER_HOLD - 1;
+            if (i == cycle) {
+                if (remainder == 0) {
+                    continue;
+                } else {
+                    endIndex = startIndex + remainder - 1;
+                }
+            }
+            for (int index = startIndex; index <= endIndex; index++) {
+                subset.add(sortedList.get(index));
+            }
+            producer = sessionFactory.createProducer();
+            try {
+                Set<String> succTopicSet = producer.publish(subset);
+                if (succTopicSet != null) {
+                    for (String succTopic : succTopicSet) {
+                        producerMap.put(succTopic, producer);
+                    }
+                    currentPublishTopicNum.set(succTopicSet.size());
+                    logger.info(getName() + " success Subset  : " + succTopicSet);
+                }
+            } catch (Exception e) {
+                logger.info(getName() + " meta sink initTopicSet fail.", e);
+            }
+        }
+        logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms");
+        logger.info(getName() + " producer is ready for topics : " + producerMap.keySet());
+    }
+
+    @Override
+    public void start() {
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
+        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+        //register metrics
+        this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+        MetricRegister.register(metricItemSet);
+        
+        //create tube connection
+        try {
+            createConnection();
+        } catch (FlumeException e) {
+            logger.error("Unable to create tube client" + ". Exception follows.", e);
+
+            /* Try to prevent leaking resources. */
+            destroyConnection();
+
+            /* FIXME: Mark ourselves as failed. */
+            stop();
+            return;
+        }
+
+        // start the cleaner thread
+        if (clientIdCache && !isNewCache) {
+            idCacheCleaner = new IdCacheCleaner(this, maxSurvivedTime, maxSurvivedSize);
+            idCacheCleaner.start();
+        }
+
+        super.start();
+        this.canSend = true;
+        this.canTake = true;
+
+        try {
+            initTopicSet(new HashSet<String>(topicProperties.values()));
+        } catch (Exception e) {
+            logger.info("meta sink start publish topic fail.", e);
+        }
+
+        for (int i = 0; i < sinkThreadPool.length; i++) {
+            sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i);
+            sinkThreadPool[i].start();
+        }
+
+    }
+
+    class SinkTask implements Runnable {
+        private void sendMessage(Event event, String topic, AtomicBoolean flag, EventStat es)
+            throws TubeClientException, InterruptedException {
+            String clientId = event.getHeaders().get(ConfigConstants.SEQUENCE_ID);
+            if (!isNewCache) {
+                Long lastTime = 0L;
+                if (clientIdCache && clientId != null) {
+                    lastTime = agentIdMap.put(clientId, System.currentTimeMillis());
+                }
+                if (clientIdCache && clientId != null && lastTime != null && lastTime > 0) {
+                    logger.info("{} agent package {} existed,just discard.", getName(), clientId);
+                } else {
+                    Message message = this.parseEvent2Message(topic, event);
+                    producer.sendMessage(message, new MyCallback(es));
+                    flag.set(true);
+
+                }
+            } else {
+                boolean hasKey = false;
+                if (clientIdCache && clientId != null) {
+                    hasKey = agentIdCache.asMap().containsKey(clientId);
+                }
+
+                if (clientIdCache && clientId != null && hasKey) {
+                    agentIdCache.put(clientId, System.currentTimeMillis());
+                    logger.info("{} agent package {} existed,just discard.", getName(), clientId);
+                } else {
+                    if (clientId != null) {
+                        agentIdCache.put(clientId, System.currentTimeMillis());
+                    }
+
+                    Message message = this.parseEvent2Message(topic, event);
+                    producer.sendMessage(message, new MyCallback(es));
+                    flag.set(true);
+                }
+            }
+            illegalTopicMap.remove(topic);
+        }
+        
+        /**
+         * parseEvent2Message
+         * @param topic
+         * @param event
+         * @return
+         */
+        private Message parseEvent2Message(String topic, Event event) {
+            Message message = new Message(topic, event.getBody());
+            message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
+            String streamId = "";
+            if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
+                streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+            } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
+                streamId = event.getHeaders().get(AttributeConstants.INAME);
+            }
+            message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+            // common attributes
+            Map<String, String> headers = event.getHeaders();
+            message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID));
+            message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID));
+            message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC));
+            message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME));
+            message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP));
+            return message;
+        }
+
+        private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) {
+            if (t instanceof TubeClientException) {
+                String message = t.getMessage();
+                if (message != null && (message.contains("No available queue for topic")
+                    || message.contains("The brokers of topic are all forbidden"))) {
+                    illegalTopicMap.put(topic, System.currentTimeMillis() + 60 * 1000);
+                    logger.info("IllegalTopicMap.put " + topic);
+                    return;
+                } else {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        //ignore..
+                    }
+                }
+            }
+            logger.error("Sink task fail to send the message, decrementFlag=" + decrementFlag + ",sink.name="
+                + Thread.currentThread().getName()
+                + ",event.headers=" + es.getEvent().getHeaders(), t);
+        }
+
+        @Override
+        public void run() {
+            logger.info("Sink task {} started.", Thread.currentThread().getName());
+            while (canSend) {
+                boolean decrementFlag = false;
+                boolean resendBadEvent = false;
+                Event event = null;
+                EventStat es = null;
+                String topic = null;
+                try {
+                    if (SimpleMessageTubeSink.this.overflow) {
+                        SimpleMessageTubeSink.this.overflow = false;
+                        Thread.sleep(10);
+                    }
+                    if (!resendQueue.isEmpty()) {
+                        es = resendQueue.poll();
+                        if (es != null) {
+                            event = es.getEvent();
+                            // logger.warn("Resend event: {}", event.toString());
+                            if (event.getHeaders().containsKey(TOPIC)) {
+                                topic = event.getHeaders().get(TOPIC);
+                            }
+                            resendBadEvent = true;
+                        }
+                    } else {
+                        event = eventQueue.take();
+                        es = new EventStat(event);
+//                            sendCnt.incrementAndGet();
+                        if (event.getHeaders().containsKey(TOPIC)) {
+                            topic = event.getHeaders().get(TOPIC);
+                        }
+                    }
+
+                    if (event == null) {
+                        // ignore event is null, when multiple-thread SinkTask running
+                        // this null value comes from resendQueue
+                        continue;
+                    }
+
+                    if (topic == null || topic.equals("")) {
+                        logger.warn("no topic specified in event header, just skip this event");
+                        continue;
+                    }
+
+                    Long expireTime = illegalTopicMap.get(topic);
+                    if (expireTime != null) {
+                        long currentTime = System.currentTimeMillis();
+                        if (expireTime > currentTime) {
+
+                            // TODO: need to be improved.
+//                            reChannelEvent(es, topic);
+                            continue;
+                        } else {
+
+                            illegalTopicMap.remove(topic);
+                        }
+                    }
+                    MessageProducer producer = null;
+                    try {
+                        producer = getProducer(topic);
+                    } catch (Exception e) {
+                        logger.error("Get producer failed!", e);
+                    }
+
+                    if (producer == null) {
+                        illegalTopicMap.put(topic, System.currentTimeMillis() + 30 * 1000);
+                        continue;
+                    }
+
+                    AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag);
+                    sendMessage(event, topic, flagAtomic, es);
+                    decrementFlag = flagAtomic.get();
+
+                } catch (InterruptedException e) {
+                    logger.info("Thread {} has been interrupted!", Thread.currentThread().getName());
+                    return;
+                } catch (Throwable t) {
+                    handleException(t, topic, decrementFlag, es);
+                    resendEvent(es, decrementFlag);
+                }
+            }
+        }
+    }
+
+    public class MyCallback implements MessageSentCallback {
+        private EventStat myEventStat;
+        private long sendTime;
+
+        public MyCallback(EventStat eventStat) {
+            this.myEventStat = eventStat;
+            this.sendTime = System.currentTimeMillis();
+        }
+
+        @Override
+        public void onMessageSent(final MessageSentResult result) {
+            if (result.isSuccess()) {
+                // TODO: add stats
+                this.addMetric(myEventStat.getEvent(), true, sendTime);
+            } else {
+                this.addMetric(myEventStat.getEvent(), false, 0);
+                if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
+                    logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
+                            result.getErrMsg(), resendQueue.size(),
+                            myEventStat.getEvent().hashCode());
+
+                    return;
+                }
+                if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) {
+                    logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
+                            result.getErrMsg(), resendQueue.size(),
+                            myEventStat.getEvent().hashCode());
+                }
+                resendEvent(myEventStat, true);
+            }
+        }
+
+        /**
+         * addMetric
+         * 
+         * @param currentRecord
+         * @param topic
+         * @param result
+         * @param size
+         */
+        private void addMetric(Event currentRecord, boolean result, long sendTime) {
+            Map<String, String> dimensions = new HashMap<>();
+            dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, SimpleMessageTubeSink.this.getName());
+            dimensions.put(DataProxyMetricItem.KEY_SINK_ID, SimpleMessageTubeSink.this.getName());
+            if (currentRecord.getHeaders().containsKey(TOPIC)) {
+                dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, currentRecord.getHeaders().get(TOPIC));
+            } else {
+                dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
+            }
+            DataProxyMetricItem metricItem = SimpleMessageTubeSink.this.metricItemSet.findMetricItem(dimensions);
+            if (result) {
+                metricItem.sendSuccessCount.incrementAndGet();
+                metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
+                if (sendTime > 0) {
+                    long currentTime = System.currentTimeMillis();
+                    long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+                            sendTime);
+                    long sinkDuration = currentTime - sendTime;
+                    long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
+                    long wholeDuration = currentTime - msgTime;
+                    metricItem.sinkDuration.addAndGet(sinkDuration);
+                    metricItem.nodeDuration.addAndGet(nodeDuration);
+                    metricItem.wholeDuration.addAndGet(wholeDuration);
+                }
+            } else {
+                metricItem.sendFailCount.incrementAndGet();
+                metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
+            }
+        }
+
+        @Override
+        public void onException(final Throwable e) {
+            Throwable t = e;
+            while (t.getCause() != null) {
+                t = t.getCause();
+            }
+            if (t instanceof OverflowException) {
+                SimpleMessageTubeSink.this.overflow = true;
+            }
+            resendEvent(myEventStat, true);
+        }
+    }
+
+    /**
+     * resend event
+     *
+     * @param es
+     * @param isDecrement
+     */
+    private void resendEvent(EventStat es, boolean isDecrement) {
+        try {
+            if (es == null || es.getEvent() == null) {
+                return;
+            }
+
+            if (clientIdCache) {
+                String clientId = es.getEvent().getHeaders().get(ConfigConstants.SEQUENCE_ID);
+                if (!isNewCache) {
+                    if (clientId != null && agentIdMap.containsKey(clientId)) {
+                        agentIdMap.remove(clientId);
+                    }
+                } else {
+                    if (clientId != null && agentIdCache.asMap().containsKey(clientId)) {
+                        agentIdCache.invalidate(clientId);
+                    }
+                }
+            }
+        } catch (Throwable throwable) {
+            logger.error(getName() + " Discard msg because put events to both of queue and "
+                    + "fileChannel fail,current resendQueue.size = "
+                    + resendQueue.size(), throwable);
+        }
+    }
+
+    @Override
+    public Status process() throws EventDeliveryException {
+        if (!this.canTake) {
+            return Status.BACKOFF;
+        }
+        Status status = Status.READY;
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event != null) {
+                if (diskRateLimiter != null) {
+                    diskRateLimiter.acquire(event.getBody().length);
+                }
+                if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
+                    logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
+                            + "--> Tube,Check if Tube server or network is ok.(if this situation last long time "
+                            + "it will cause memoryChannel full and fileChannel write.)", getName());
+                    tx.rollback();
+                } else {
+                    tx.commit();
+                    // metric
+                    if (event.getHeaders().containsKey(TOPIC)) {
+                        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().get(TOPIC));
+                    } else {
+                        dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
+                    }
+                    DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
+                    metricItem.readFailCount.incrementAndGet();
+                    metricItem.readFailSize.addAndGet(event.getBody().length);
+                }
+            } else {
+
+                // logger.info("[{}]No data to process in the channel.",getName());
+                status = Status.BACKOFF;
+                tx.commit();
+            }
+        } catch (Throwable t) {
+            logger.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                logger.error("metasink transaction rollback exception", e);
+
+            }
+        } finally {
+            tx.close();
+        }
+        return status;
+    }
+
+    @Override
+    public void configure(Context context) {
+        logger.info(context.toString());
+//        logger.info("sinktest:"+getName()+getChannel());//sinktest:meta-sink-msg2null
+
+        configManager = ConfigManager.getInstance();
+        topicProperties = configManager.getTopicProperties();
+        configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
+            @Override
+            public void update() {
+
+                diffSetPublish(new HashSet<String>(topicProperties.values()),
+                        new HashSet<String>(configManager.getTopicProperties().values()));
+            }
+        });
+
+        masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
+        Preconditions.checkState(masterHostAndPortList != null, "No master and port list specified");
+
+        producerMap = new HashMap<String, MessageProducer>();
+
+        logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, defaultLogEveryNEvents);
+        logger.debug(this.getName() + " " + LOG_EVERY_N_EVENTS + " " + logEveryNEvents);
+        Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must be > 0");
+
+        sendTimeout = context.getInteger(SEND_TIMEOUT, defaultSendTimeout);
+        logger.debug(this.getName() + " " + SEND_TIMEOUT + " " + sendTimeout);
+        Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 0");
+
+        MAX_TOPICS_EACH_PRODUCER_HOLD = context.getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD_NAME, 200);
+        retryCnt = context.getInteger(RETRY_CNT, defaultRetryCnt);
+        logger.debug(this.getName() + " " + RETRY_CNT + " " + retryCnt);
+
+        boolean isSlaMetricSink = context.getBoolean(SLA_METRIC_SINK, false);
+        if (isSlaMetricSink) {
+            this.metaTopicFilePath = slaTopicFilePath;
+        }
+
+        clientIdCache = context.getBoolean(CLIENT_ID_CACHE, clientIdCache);
+        if (clientIdCache) {
+            int survivedTime = context.getInteger(MAX_SURVIVED_TIME, maxSurvivedTime);
+            if (survivedTime > 0) {
+                maxSurvivedTime = survivedTime;
+            } else {
+                logger.warn("invalid {}:{}", MAX_SURVIVED_TIME, survivedTime);
+            }
+
+            int survivedSize = context.getInteger(MAX_SURVIVED_SIZE, maxSurvivedSize);
+            if (survivedSize > 0) {
+                maxSurvivedSize = survivedSize;
+            } else {
+                logger.warn("invalid {}:{}", MAX_SURVIVED_SIZE, survivedSize);
+            }
+        }
+
+        String requestTimeout = context.getString(TUBE_REQUEST_TIMEOUT);
+        if (requestTimeout != null) {
+            this.requestTimeout = Integer.parseInt(requestTimeout);
+        }
+
+        String sendRemoteStr = context.getString(SEND_REMOTE);
+        if (sendRemoteStr != null) {
+            sendRemote = Boolean.parseBoolean(sendRemoteStr);
+        }
+        if (sendRemote) {
+            proxyLogTopic = context.getString(LOG_TOPIC, proxyLogTopic);
+            proxyLogGroupId = context.getString(GROUPID, proxyLogStreamId);
+            proxyLogStreamId = context.getString(STREAMID, proxyLogStreamId);
+        }
+
+        resendQueue = new LinkedBlockingQueue<>(BAD_EVENT_QUEUE_SIZE);
+
+        String sinkThreadNum = context.getString(SINK_THREAD_NUM, "4");
+        threadNum = Integer.parseInt(sinkThreadNum);
+        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
+        sinkThreadPool = new Thread[threadNum];
+        eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
+
+        diskIORatePerSec = context.getLong(KEY_DISK_IO_RATE_PER_SEC, 0L);
+        if (diskIORatePerSec != 0) {
+            diskRateLimiter = RateLimiter.create(diskIORatePerSec);
+        }
+
+        linkMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
+                80000L);
+        sessionWarnDelayedMsgCount = context.getLong(ConfigConstants.SESSION_WARN_DELAYED_MSG_COUNT,
+                2000000L);
+        sessionMaxAllowedDelayedMsgCount = context.getLong(ConfigConstants.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT,
+                4000000L);
+        nettyWriteBufferHighWaterMark = context.getLong(ConfigConstants.NETTY_WRITE_BUFFER_HIGH_WATER_MARK,
+                15 * 1024 * 1024L);
+        recoverthreadcount = context.getInteger(ConfigConstants.RECOVER_THREAD_COUNT,
+                Runtime.getRuntime().availableProcessors() + 1);
+    }
+
+    /**
+     * get metricItemSet
+     * @return the metricItemSet
+     */
+    public DataProxyMetricItemSet getMetricItemSet() {
+        return metricItemSet;
+    }
+    
+}