You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/08/15 07:27:52 UTC

[GitHub] [inlong] gosonzhang commented on a diff in pull request #5545: [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink

gosonzhang commented on code in PR #5545:
URL: https://github.com/apache/inlong/pull/5545#discussion_r945482405


##########
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.common;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flume.FlumeException;
+import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TubeProducerHolder {
+    private static final Logger logger =
+            LoggerFactory.getLogger(TubeProducerHolder.class);
+    private static final long SEND_FAILURE_WAIT = 30000L;
+    private static final long PUBLISH_FAILURE_WAIT = 60000L;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final String sinkName;
+    private final String clusterAddr;
+    private final MQClusterConfig clusterConfig;
+    private TubeMultiSessionFactory sessionFactory = null;
+    private final Map<String, MessageProducer> producerMap = new ConcurrentHashMap<>();
+    private MessageProducer lastProducer = null;
+    private final AtomicInteger lastPubTopicCnt = new AtomicInteger(0);
+    private static final ConcurrentHashMap<String, AtomicLong> frozenTopicMap
+            = new ConcurrentHashMap<>();
+
+    public TubeProducerHolder(String sinkName, String clusterAddr, MQClusterConfig tubeConfig) {
+        Preconditions.checkState(StringUtils.isNotBlank(clusterAddr),
+                "No TubeMQ's cluster address list specified");
+        this.sinkName = sinkName;
+        this.clusterAddr = clusterAddr;
+        this.clusterConfig = tubeConfig;
+    }
+
+    public void start(Set<String> configTopicSet) {
+        if (!this.started.compareAndSet(false, true)) {
+            logger.info("ProducerHolder for " + sinkName + " has started!");
+            return;
+        }
+        logger.info("ProducerHolder for " + sinkName + " begin to start!");
+        // create session factory
+        try {
+            TubeClientConfig clientConfig = TubeUtils.buildClientConfig(clusterAddr, this.clusterConfig);
+            this.sessionFactory = new TubeMultiSessionFactory(clientConfig);
+            createProducersByTopicSet(configTopicSet);
+        } catch (Throwable e) {
+            stop();
+            String errInfo = "Build session factory  to " + clusterAddr
+                    + " for " + sinkName + " failure, please re-check";
+            logger.error(errInfo, e);
+            throw new FlumeException(errInfo);
+        }
+        logger.info("ProducerHolder for " + sinkName + " started!");
+    }
+
+    public void stop() {
+        if (this.started.get()) {
+            return;
+        }
+        // change start flag
+        if (!this.started.compareAndSet(true, false)) {
+            logger.info("ProducerHolder for " + sinkName + " has stopped!");
+            return;
+        }
+        logger.info("ProducerHolder for " + sinkName + " begin to stop!");
+        for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) {
+            if (entry == null || entry.getValue() == null) {
+                continue;
+            }
+            try {
+                entry.getValue().shutdown();
+            } catch (Throwable e) {
+                // ignore log
+            }
+        }
+        producerMap.clear();
+        lastProducer = null;
+        lastPubTopicCnt.set(0);
+        frozenTopicMap.clear();
+        if (sessionFactory != null) {
+            try {
+                sessionFactory.shutdown();
+            } catch (Throwable e) {
+                // ignore log
+            }
+            sessionFactory = null;
+        }
+        logger.info("ProducerHolder for " + sinkName + " finished stop!");
+    }
+
+    /**
+     * Get producer by topic name:
+     *   i. if the topic is judged to be an illegal topic, return null;
+     *   ii. if it is not an illegal topic or the status has expired, check:
+     *    a. if the topic has been published before, return the corresponding producer directly;
+     *    b. if the topic is not in the published list, perform the topic's publish action.
+     *  If the topic is thrown exception during the publishing process,
+     *     set the topic to an illegal topic
+     *
+     * @param topicName  the topic name
+     *
+     * @return  the producer
+     *          if topic is illegal, return null
+     * @throws  TubeClientException
+     */
+    public MessageProducer getProducer(String topicName) throws TubeClientException {
+        AtomicLong fbdTime = frozenTopicMap.get(topicName);
+        if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) {
+            return null;
+        }
+        MessageProducer tmpProducer = producerMap.get(topicName);
+        if (tmpProducer != null) {
+            if (fbdTime != null) {
+                frozenTopicMap.remove(topicName);
+            }
+            return tmpProducer;
+        }
+        synchronized (lastPubTopicCnt) {
+            fbdTime = frozenTopicMap.get(topicName);
+            if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) {
+                return null;
+            }
+            if (lastProducer == null
+                    || lastPubTopicCnt.get() >= clusterConfig.getMaxTopicsEachProducerHold()) {
+                lastProducer = sessionFactory.createProducer();
+                lastPubTopicCnt.set(0);
+            }
+            try {
+                lastProducer.publish(topicName);
+            } catch (Throwable e) {
+                fbdTime = frozenTopicMap.get(topicName);
+                if (fbdTime == null) {
+                    AtomicLong tmpFbdTime = new AtomicLong();
+                    fbdTime = frozenTopicMap.putIfAbsent(topicName, tmpFbdTime);
+                    if (fbdTime == null) {
+                        fbdTime = tmpFbdTime;
+                    }
+                }
+                fbdTime.set(System.currentTimeMillis() + PUBLISH_FAILURE_WAIT);
+                logger.warn("Throw exception while publish topic="

Review Comment:
   No need, the error message of the TubeMQ has already fed back the cause of the abnormality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org