You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/11/07 08:02:38 UTC

[inlong] branch master updated: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aea39cc19 [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413)
aea39cc19 is described below

commit aea39cc19283ccdfa696ce8470b8e8ac60bed42f
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Nov 7 16:02:32 2022 +0800

    [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413)
---
 .../inlong/common/enums/DataProxyErrCode.java      |  34 +++---
 .../inlong/dataproxy/config/ConfigManager.java     |  11 ++
 .../config/remote/ConfigMessageServlet.java        |   7 +-
 .../inlong/dataproxy/http/MessageFilter.java       | 117 ++++++++++++---------
 .../dataproxy/http/MessageProcessServlet.java      |   3 +-
 .../apache/inlong/dataproxy/http/StatusCode.java   |  41 --------
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |  11 +-
 .../org/apache/inlong/dataproxy/sink/TubeSink.java |  54 +++++++---
 .../dataproxy/sink/common/TubeProducerHolder.java  |   2 +-
 .../dataproxy/sink/pulsar/PulsarClientService.java |   6 +-
 .../dataproxy/source/ServerMessageHandler.java     |  18 +++-
 11 files changed, 175 insertions(+), 129 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index 8943ebfd3..732f376e0 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -25,19 +25,27 @@ public enum DataProxyErrCode {
 
     SUCCESS(0, "Ok"),
 
-    UNSUPPORTED_MSG_TYPE(1, "Unsupported msgType"),
-    EMPTY_MSG(2, "Empty message"),
-    UNSUPPORTED_EXTEND_FIELD_VALUE(3, "Unsupported extend field value"),
-    UNCONFIGURED_GROUPID_OR_STREAMID(4, "Unconfigured groupId or streamId"),
-    PUT_EVENT_TO_CHANNEL_FAILURE(5, "Put event to Channels failure"),
-
-    TOPIC_IS_BLANK(6, "Topic is null"),
-    NO_AVAILABLE_PRODUCER(7, "No available producer info"),
-    PRODUCER_IS_NULL(8, "Producer is null"),
-    SEND_REQUEST_TO_MQ_FAILURE(9, "Send request to MQ failure"),
-    MQ_RETURN_ERROR(10, "MQ client return error"),
-
-    DUPLICATED_MESSAGE(11, "Duplicated message"),
+    SINK_SERVICE_UNREADY(1, "Service not ready"),
+
+    MISS_REQUIRED_GROUPID_ARGUMENT(100, "Parameter groupId is required"),
+    MISS_REQUIRED_STREAMID_ARGUMENT(101, "Parameter streamId is required"),
+    MISS_REQUIRED_DT_ARGUMENT(102, "Parameter dt is required"),
+    MISS_REQUIRED_BODY_ARGUMENT(103, "Parameter body is required"),
+    BODY_EXCEED_MAX_LEN(104, "Body length exceed the maximum length"),
+
+    UNSUPPORTED_MSG_TYPE(110, "Unsupported msgType"),
+    EMPTY_MSG(111, "Empty message"),
+    UNSUPPORTED_EXTEND_FIELD_VALUE(112, "Unsupported extend field value"),
+    UNCONFIGURED_GROUPID_OR_STREAMID(113, "Unconfigured groupId or streamId"),
+    PUT_EVENT_TO_CHANNEL_FAILURE(114, "Put event to Channels failure"),
+
+    TOPIC_IS_BLANK(115, "Topic is null"),
+    NO_AVAILABLE_PRODUCER(116, "No available producer info"),
+    PRODUCER_IS_NULL(117, "Producer is null"),
+    SEND_REQUEST_TO_MQ_FAILURE(118, "Send request to MQ failure"),
+    MQ_RETURN_ERROR(119, "MQ client return error"),
+
+    DUPLICATED_MESSAGE(120, "Duplicated message"),
 
     UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error");
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 95b95792c..fcff9c3cd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL;
 
@@ -77,6 +78,8 @@ public class ConfigManager {
     private final FileConfigHolder blackListConfig = new FileConfigHolder("blacklist.properties");
     // source report configure holder
     private final SourceReportConfigHolder sourceReportConfigHolder = new SourceReportConfigHolder();
+    // mq clusters ready
+    private final AtomicBoolean mqClusterReady = new AtomicBoolean(false);
 
     /**
      * get instance for config manager
@@ -148,6 +151,14 @@ public class ConfigManager {
         return sourceReportConfigHolder.getSourceReportInfo();
     }
 
+    public boolean isMqClusterReady() {
+        return mqClusterReady.get();
+    }
+
+    public void updMqClusterStatus(boolean isStarted) {
+        mqClusterReady.set(isStarted);
+    }
+
     /**
      * update old maps, reload local files if changed.
      *
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
index 88cc89023..f6760081e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
@@ -29,8 +29,8 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.http.StatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +91,8 @@ public class ConfigMessageServlet extends HttpServlet {
 
     @Override
     protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
-        ResponseResult result = new ResponseResult(StatusCode.SERVICE_ERR, "");
+        ResponseResult result =
+                new ResponseResult(DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), "");
         BufferedReader reader = null;
         try {
             reader = req.getReader();
@@ -110,7 +111,7 @@ public class ConfigMessageServlet extends HttpServlet {
             }
 
             if (isSuccess) {
-                result.setCode(StatusCode.SUCCESS);
+                result.setCode(DataProxyErrCode.SUCCESS.getErrCode());
             } else {
                 result.setMessage("cannot operate config update, please check it");
             }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
index 4c6b1ce51..a4c23ca24 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
@@ -19,7 +19,9 @@ package org.apache.inlong.dataproxy.http;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.ChannelException;
+import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,81 +51,98 @@ public class MessageFilter implements Filter {
     }
 
     @Override
-    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException {
+    public void doFilter(ServletRequest request,
+                         ServletResponse response,
+                         FilterChain chain) throws IOException {
         HttpServletRequest req = (HttpServletRequest) request;
         HttpServletResponse resp = (HttpServletResponse) response;
 
-        int code = StatusCode.SUCCESS;
-        String message = "success";
-
         String pathInfo = req.getPathInfo();
         if (pathInfo.startsWith("/")) {
             pathInfo = pathInfo.substring(1);
         }
         if ("heartbeat".equals(pathInfo)) {
-            resp.setCharacterEncoding(req.getCharacterEncoding());
-            resp.setStatus(HttpServletResponse.SC_OK);
-            resp.flushBuffer();
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SUCCESS.getErrCode(),
+                    DataProxyErrCode.SUCCESS.getErrMsg());
             return;
         }
-
-        String invalidKey = null;
+        // check sink service status
+        if (!ConfigManager.getInstance().isMqClusterReady()) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCode(),
+                    DataProxyErrCode.SINK_SERVICE_UNREADY.getErrMsg());
+            return;
+        }
+        // get and check groupId
         String groupId = req.getParameter(AttributeConstants.GROUP_ID);
+        if (StringUtils.isEmpty(groupId)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check streamId
         String streamId = req.getParameter(AttributeConstants.STREAM_ID);
+        if (StringUtils.isEmpty(streamId)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check dt
         String dt = req.getParameter(AttributeConstants.DATA_TIME);
+        if (StringUtils.isEmpty(dt)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrMsg());
+            return;
+        }
+        // get and check body
         String body = req.getParameter(AttrConstants.BODY);
-
-        if (StringUtils.isEmpty(groupId)) {
-            invalidKey = "groupId";
-        } else if (StringUtils.isEmpty(streamId)) {
-            invalidKey = "streamId";
-        } else if (StringUtils.isEmpty(dt)) {
-            invalidKey = "dt";
-        } else if (StringUtils.isEmpty(body)) {
-            invalidKey = "body";
+        if (StringUtils.isEmpty(body)) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
+                    DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrMsg());
+            return;
+        }
+        // check body length
+        if (body.length() > maxMsgLength) {
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
+                    "Bad request, body length exceeds the limit:" + maxMsgLength);
+            return;
         }
-
         try {
-            if (invalidKey != null) {
-                LOG.warn("Received bad request from client. " + invalidKey + " is empty.");
-                code = StatusCode.ILLEGAL_ARGUMENT;
-                message = "Bad request from client. " + invalidKey + " must not be empty.";
-            } else if (body.length() > maxMsgLength) {
-                LOG.warn("Received bad request from client. Body length is " + body.length());
-                code = StatusCode.EXCEED_LEN;
-                message = "Bad request from client. Body length is exceeding the limit:" + maxMsgLength;
-            } else {
-                chain.doFilter(request, response);
-            }
+            chain.doFilter(request, response);
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.SUCCESS.getErrCode(),
+                    DataProxyErrCode.SUCCESS.getErrMsg());
         } catch (Throwable t) {
-            code = StatusCode.SERVICE_ERR;
+            String errMsg;
             if ((t instanceof ChannelException)) {
-                message = "Channel error!";
+                errMsg = "Channel error! " + t.getMessage();
             } else {
-                message = "Service error!";
-                LOG.error("Request error!", t);
+                errMsg = "Service error! " + t.getMessage();
             }
+            LOG.error("Request error!", t);
+            returnRspPackage(resp, req.getCharacterEncoding(),
+                    DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), errMsg);
         }
-
-        resp.setCharacterEncoding(req.getCharacterEncoding());
-        resp.setStatus(HttpServletResponse.SC_OK);
-        resp.getWriter().write(getResultContent(code, message));
-        resp.flushBuffer();
     }
 
     @Override
     public void destroy() {
     }
 
-    private String getResultContent(int code, String message) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{\"code\":\"");
-        builder.append(code);
-        builder.append("\",\"msg\":\"");
-        builder.append(message);
-        builder.append("\"}");
-
-        return builder.toString();
+    private void returnRspPackage(HttpServletResponse resp, String charEncoding,
+                                  int errCode, String errMsg) throws IOException {
+        StringBuilder builder =
+                new StringBuilder().append("{\"code\":\"").append(errCode)
+                        .append("\",\"msg\":\"").append(errMsg).append("\"}");
+        resp.setCharacterEncoding(charEncoding);
+        resp.setStatus(HttpServletResponse.SC_OK);
+        resp.getWriter().write(builder.toString());
+        resp.flushBuffer();
     }
-
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
index 10ebcd966..61a5b2ab6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.http;
 
+import org.apache.inlong.common.enums.DataProxyErrCode;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.AttrConstants;
@@ -64,7 +65,7 @@ public class MessageProcessServlet extends HttpServlet {
             if (logCounter.shouldPrint()) {
                 LOG.error("Received bad request from client. ", e);
             }
-            req.setAttribute("code", StatusCode.SERVICE_ERR);
+            req.setAttribute("code", DataProxyErrCode.UNKNOWN_ERROR.getErrCode());
         }
     }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
deleted file mode 100644
index 88e411573..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.http;
-
-public interface StatusCode {
-
-    /*
-     * success
-     */
-    int SUCCESS = 1;
-
-    /*
-     * illegal argument
-     */
-    int ILLEGAL_ARGUMENT = -100;
-
-    /*
-     * exceed length
-     */
-    int EXCEED_LEN = -101;
-
-    /*
-     * service error
-     */
-    int SERVICE_ERR = -105;
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 52d690d76..6fd938030 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -305,13 +305,18 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                 new HashSet<>(topicProperties.values()));
 
         pulsarCluster = configManager.getMqClusterUrl2Token();
+        if (!ConfigManager.getInstance().isMqClusterReady()) {
+            this.canTake = true;
+            ConfigManager.getInstance().updMqClusterStatus(true);
+            logger.info("[{}] MQ Cluster service status ready!", getName());
+        }
     }
 
     @Override
     public void start() {
         logger.info("[{}] pulsar sink starting...", getName());
         sinkCounter.start();
-        pulsarClientService.initCreateConnection(this);
+        pulsarClientService.initCreateConnection(this, getName());
 
         int statIntervalSec = pulsarConfig.getStatIntervalSec();
         Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0");
@@ -345,7 +350,9 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
                         ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
         this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
         MetricRegister.register(metricItemSet);
-        this.canTake = true;
+        if (ConfigManager.getInstance().isMqClusterReady()) {
+            this.canTake = true;
+        }
         logger.info("[{}] Pulsar sink started", getName());
     }
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 461c0ab49..dee798214 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -128,14 +128,16 @@ public class TubeSink extends AbstractSink implements Configurable {
         tubeConfig = configManager.getMqClusterConfig();
         topicProperties = configManager.getTopicProperties();
         masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
-        // start message deduplication handler
-        MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
-                tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
         // only use first cluster address now
         usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists);
         // create producer holder
-        producerHolder = new TubeProducerHolder(getName(),
-                usedMasterAddr, configManager.getMqClusterConfig());
+        if (usedMasterAddr != null) {
+            producerHolder = new TubeProducerHolder(getName(),
+                    usedMasterAddr, configManager.getMqClusterConfig());
+        }
+        // start message deduplication handler
+        MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
+                tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
         // get statistic configure items
         maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000);
         statIntervalSec = tubeConfig.getStatIntervalSec();
@@ -196,17 +198,23 @@ public class TubeSink extends AbstractSink implements Configurable {
         this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
         MetricRegister.register(metricItemSet);
         // create tube connection
-        try {
-            producerHolder.start(new HashSet<>(topicProperties.values()));
-        } catch (FlumeException e) {
-            logger.error("Unable to start TubeMQ client. Exception follows.", e);
-            super.stop();
-            return;
+        if (producerHolder != null) {
+            try {
+                producerHolder.start(new HashSet<>(topicProperties.values()));
+                ConfigManager.getInstance().updMqClusterStatus(true);
+                logger.info("[{}] MQ Cluster service status ready!", getName());
+            } catch (FlumeException e) {
+                logger.error("Unable to start TubeMQ client. Exception follows.", e);
+                super.stop();
+                return;
+            }
         }
         // start the cleaner thread
         super.start();
         this.canSend = true;
-        this.canTake = true;
+        if (ConfigManager.getInstance().isMqClusterReady()) {
+            this.canTake = true;
+        }
         for (int i = 0; i < sinkThreadPool.length; i++) {
             sinkThreadPool[i] = new Thread(new TubeSinkTask(),
                     getName() + "_tube_sink_sender-" + i);
@@ -610,10 +618,12 @@ public class TubeSink extends AbstractSink implements Configurable {
         }
         // publish them
         if (!addedTopics.isEmpty()) {
-            try {
-                producerHolder.createProducersByTopicSet(addedTopics);
-            } catch (Exception e) {
-                logger.info(getName() + "'s publish new topic set fail.", e);
+            if (producerHolder != null) {
+                try {
+                    producerHolder.createProducersByTopicSet(addedTopics);
+                } catch (Exception e) {
+                    logger.info(getName() + "'s publish new topic set fail.", e);
+                }
             }
             logger.info(getName() + "'s topics set has changed, trigger diff publish for {}",
                     addedTopics);
@@ -658,7 +668,17 @@ public class TubeSink extends AbstractSink implements Configurable {
         producerHolder = newProducerHolder;
         usedMasterAddr = newMasterAddr;
         // close old producer holder
-        tmpProducerHolder.stop();
+        if (tmpProducerHolder == null) {
+            diffSetPublish(new HashSet<>(),
+                    new HashSet<>(configManager.getTopicProperties().values()));
+        } else {
+            tmpProducerHolder.stop();
+        }
+        if (!ConfigManager.getInstance().isMqClusterReady()) {
+            this.canTake = true;
+            ConfigManager.getInstance().updMqClusterStatus(true);
+            logger.info("[{}] MQ Cluster service status ready!", getName());
+        }
         logger.info(getName() + " switch cluster from "
                 + tmpMasterAddr + " to " + usedMasterAddr);
     }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
index c1852fed3..04b43c87f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
@@ -211,7 +211,7 @@ public class TubeProducerHolder {
      *
      * @param cfgTopicSet  the configured topic set
      */
-    public void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception {
+    public synchronized void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception {
         if (cfgTopicSet == null || cfgTopicSet.isEmpty()) {
             return;
         }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index cca008c19..13d58c4fc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -116,7 +116,7 @@ public class PulsarClientService {
         topicSendIndexMap = new ConcurrentHashMap<>();
     }
 
-    public void initCreateConnection(CreatePulsarClientCallBack callBack) {
+    public void initCreateConnection(CreatePulsarClientCallBack callBack, String sinkName) {
         pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
             logger.warn("failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
@@ -124,6 +124,10 @@ public class PulsarClientService {
         }
         try {
             createConnection(callBack);
+            if (!ConfigManager.getInstance().isMqClusterReady()) {
+                ConfigManager.getInstance().updMqClusterStatus(true);
+                logger.info("[{}] MQ Cluster service status ready!", sinkName);
+            }
         } catch (FlumeException e) {
             logger.error("unable to create pulsar client: ", e);
             close();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 01767ffb9..7c3636fe1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -284,6 +284,14 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
             if (commonAttrMap == null) {
                 commonAttrMap = new HashMap<>();
             }
+            // check whether extract data failure
+            String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+            if (!StringUtils.isEmpty(errCode)
+                    && !DataProxyErrCode.SUCCESS.getErrCodeStr().equals(errCode)) {
+                MessageUtils.sourceReturnRspPackage(
+                        commonAttrMap, resultMap, remoteChannel, msgType);
+                return;
+            }
             // process heartbeat message
             if (MsgType.MSG_HEARTBEAT.equals(msgType)
                     || MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
@@ -310,7 +318,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
                         commonAttrMap, resultMap, remoteChannel, msgType);
                 return;
             }
-            // transfer message data
+            // check sink service status
+            if (!ConfigManager.getInstance().isMqClusterReady()) {
+                commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+                        DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCodeStr());
+                MessageUtils.sourceReturnRspPackage(
+                        commonAttrMap, resultMap, remoteChannel, msgType);
+                return;
+            }
+            // convert message data
             Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
                     new HashMap<>(msgList.size());
             if (!convertMsgList(msgList, commonAttrMap, messageMap, strRemoteIP)) {