You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/11/07 08:02:38 UTC
[inlong] branch master updated: [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aea39cc19 [INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413)
aea39cc19 is described below
commit aea39cc19283ccdfa696ce8470b8e8ac60bed42f
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Nov 7 16:02:32 2022 +0800
[INLONG-6406][DataProxy] DataProxy should support creating sink dynamically after started (#6413)
---
.../inlong/common/enums/DataProxyErrCode.java | 34 +++---
.../inlong/dataproxy/config/ConfigManager.java | 11 ++
.../config/remote/ConfigMessageServlet.java | 7 +-
.../inlong/dataproxy/http/MessageFilter.java | 117 ++++++++++++---------
.../dataproxy/http/MessageProcessServlet.java | 3 +-
.../apache/inlong/dataproxy/http/StatusCode.java | 41 --------
.../apache/inlong/dataproxy/sink/PulsarSink.java | 11 +-
.../org/apache/inlong/dataproxy/sink/TubeSink.java | 54 +++++++---
.../dataproxy/sink/common/TubeProducerHolder.java | 2 +-
.../dataproxy/sink/pulsar/PulsarClientService.java | 6 +-
.../dataproxy/source/ServerMessageHandler.java | 18 +++-
11 files changed, 175 insertions(+), 129 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index 8943ebfd3..732f376e0 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -25,19 +25,27 @@ public enum DataProxyErrCode {
SUCCESS(0, "Ok"),
- UNSUPPORTED_MSG_TYPE(1, "Unsupported msgType"),
- EMPTY_MSG(2, "Empty message"),
- UNSUPPORTED_EXTEND_FIELD_VALUE(3, "Unsupported extend field value"),
- UNCONFIGURED_GROUPID_OR_STREAMID(4, "Unconfigured groupId or streamId"),
- PUT_EVENT_TO_CHANNEL_FAILURE(5, "Put event to Channels failure"),
-
- TOPIC_IS_BLANK(6, "Topic is null"),
- NO_AVAILABLE_PRODUCER(7, "No available producer info"),
- PRODUCER_IS_NULL(8, "Producer is null"),
- SEND_REQUEST_TO_MQ_FAILURE(9, "Send request to MQ failure"),
- MQ_RETURN_ERROR(10, "MQ client return error"),
-
- DUPLICATED_MESSAGE(11, "Duplicated message"),
+ SINK_SERVICE_UNREADY(1, "Service not ready"),
+
+ MISS_REQUIRED_GROUPID_ARGUMENT(100, "Parameter groupId is required"),
+ MISS_REQUIRED_STREAMID_ARGUMENT(101, "Parameter streamId is required"),
+ MISS_REQUIRED_DT_ARGUMENT(102, "Parameter dt is required"),
+ MISS_REQUIRED_BODY_ARGUMENT(103, "Parameter body is required"),
+ BODY_EXCEED_MAX_LEN(104, "Body length exceed the maximum length"),
+
+ UNSUPPORTED_MSG_TYPE(110, "Unsupported msgType"),
+ EMPTY_MSG(111, "Empty message"),
+ UNSUPPORTED_EXTEND_FIELD_VALUE(112, "Unsupported extend field value"),
+ UNCONFIGURED_GROUPID_OR_STREAMID(113, "Unconfigured groupId or streamId"),
+ PUT_EVENT_TO_CHANNEL_FAILURE(114, "Put event to Channels failure"),
+
+ TOPIC_IS_BLANK(115, "Topic is null"),
+ NO_AVAILABLE_PRODUCER(116, "No available producer info"),
+ PRODUCER_IS_NULL(117, "Producer is null"),
+ SEND_REQUEST_TO_MQ_FAILURE(118, "Send request to MQ failure"),
+ MQ_RETURN_ERROR(119, "MQ client return error"),
+
+ DUPLICATED_MESSAGE(120, "Duplicated message"),
UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error");
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 95b95792c..fcff9c3cd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.inlong.dataproxy.consts.ConfigConstants.CONFIG_CHECK_INTERVAL;
@@ -77,6 +78,8 @@ public class ConfigManager {
private final FileConfigHolder blackListConfig = new FileConfigHolder("blacklist.properties");
// source report configure holder
private final SourceReportConfigHolder sourceReportConfigHolder = new SourceReportConfigHolder();
+ // mq clusters ready
+ private final AtomicBoolean mqClusterReady = new AtomicBoolean(false);
/**
* get instance for config manager
@@ -148,6 +151,14 @@ public class ConfigManager {
return sourceReportConfigHolder.getSourceReportInfo();
}
+ public boolean isMqClusterReady() {
+ return mqClusterReady.get();
+ }
+
+ public void updMqClusterStatus(boolean isStarted) {
+ mqClusterReady.set(isStarted);
+ }
+
/**
* update old maps, reload local files if changed.
*
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
index 88cc89023..f6760081e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/remote/ConfigMessageServlet.java
@@ -29,8 +29,8 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
+import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.http.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +91,8 @@ public class ConfigMessageServlet extends HttpServlet {
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
- ResponseResult result = new ResponseResult(StatusCode.SERVICE_ERR, "");
+ ResponseResult result =
+ new ResponseResult(DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), "");
BufferedReader reader = null;
try {
reader = req.getReader();
@@ -110,7 +111,7 @@ public class ConfigMessageServlet extends HttpServlet {
}
if (isSuccess) {
- result.setCode(StatusCode.SUCCESS);
+ result.setCode(DataProxyErrCode.SUCCESS.getErrCode());
} else {
result.setMessage("cannot operate config update, please check it");
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
index 4c6b1ce51..a4c23ca24 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
@@ -19,7 +19,9 @@ package org.apache.inlong.dataproxy.http;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.ChannelException;
+import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,81 +51,98 @@ public class MessageFilter implements Filter {
}
@Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException {
+ public void doFilter(ServletRequest request,
+ ServletResponse response,
+ FilterChain chain) throws IOException {
HttpServletRequest req = (HttpServletRequest) request;
HttpServletResponse resp = (HttpServletResponse) response;
- int code = StatusCode.SUCCESS;
- String message = "success";
-
String pathInfo = req.getPathInfo();
if (pathInfo.startsWith("/")) {
pathInfo = pathInfo.substring(1);
}
if ("heartbeat".equals(pathInfo)) {
- resp.setCharacterEncoding(req.getCharacterEncoding());
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.flushBuffer();
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.SUCCESS.getErrCode(),
+ DataProxyErrCode.SUCCESS.getErrMsg());
return;
}
-
- String invalidKey = null;
+ // check sink service status
+ if (!ConfigManager.getInstance().isMqClusterReady()) {
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCode(),
+ DataProxyErrCode.SINK_SERVICE_UNREADY.getErrMsg());
+ return;
+ }
+ // get and check groupId
String groupId = req.getParameter(AttributeConstants.GROUP_ID);
+ if (StringUtils.isEmpty(groupId)) {
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
+ DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrMsg());
+ return;
+ }
+ // get and check streamId
String streamId = req.getParameter(AttributeConstants.STREAM_ID);
+ if (StringUtils.isEmpty(streamId)) {
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
+ DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrMsg());
+ return;
+ }
+ // get and check dt
String dt = req.getParameter(AttributeConstants.DATA_TIME);
+ if (StringUtils.isEmpty(dt)) {
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrCode(),
+ DataProxyErrCode.MISS_REQUIRED_DT_ARGUMENT.getErrMsg());
+ return;
+ }
+ // get and check body
String body = req.getParameter(AttrConstants.BODY);
-
- if (StringUtils.isEmpty(groupId)) {
- invalidKey = "groupId";
- } else if (StringUtils.isEmpty(streamId)) {
- invalidKey = "streamId";
- } else if (StringUtils.isEmpty(dt)) {
- invalidKey = "dt";
- } else if (StringUtils.isEmpty(body)) {
- invalidKey = "body";
+ if (StringUtils.isEmpty(body)) {
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
+ DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrMsg());
+ return;
+ }
+ // check body length
+ if (body.length() > maxMsgLength) {
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
+ "Bad request, body length exceeds the limit:" + maxMsgLength);
+ return;
}
-
try {
- if (invalidKey != null) {
- LOG.warn("Received bad request from client. " + invalidKey + " is empty.");
- code = StatusCode.ILLEGAL_ARGUMENT;
- message = "Bad request from client. " + invalidKey + " must not be empty.";
- } else if (body.length() > maxMsgLength) {
- LOG.warn("Received bad request from client. Body length is " + body.length());
- code = StatusCode.EXCEED_LEN;
- message = "Bad request from client. Body length is exceeding the limit:" + maxMsgLength;
- } else {
- chain.doFilter(request, response);
- }
+ chain.doFilter(request, response);
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.SUCCESS.getErrCode(),
+ DataProxyErrCode.SUCCESS.getErrMsg());
} catch (Throwable t) {
- code = StatusCode.SERVICE_ERR;
+ String errMsg;
if ((t instanceof ChannelException)) {
- message = "Channel error!";
+ errMsg = "Channel error! " + t.getMessage();
} else {
- message = "Service error!";
- LOG.error("Request error!", t);
+ errMsg = "Service error! " + t.getMessage();
}
+ LOG.error("Request error!", t);
+ returnRspPackage(resp, req.getCharacterEncoding(),
+ DataProxyErrCode.UNKNOWN_ERROR.getErrCode(), errMsg);
}
-
- resp.setCharacterEncoding(req.getCharacterEncoding());
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.getWriter().write(getResultContent(code, message));
- resp.flushBuffer();
}
@Override
public void destroy() {
}
- private String getResultContent(int code, String message) {
- StringBuilder builder = new StringBuilder();
- builder.append("{\"code\":\"");
- builder.append(code);
- builder.append("\",\"msg\":\"");
- builder.append(message);
- builder.append("\"}");
-
- return builder.toString();
+ private void returnRspPackage(HttpServletResponse resp, String charEncoding,
+ int errCode, String errMsg) throws IOException {
+ StringBuilder builder =
+ new StringBuilder().append("{\"code\":\"").append(errCode)
+ .append("\",\"msg\":\"").append(errMsg).append("\"}");
+ resp.setCharacterEncoding(charEncoding);
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.getWriter().write(builder.toString());
+ resp.flushBuffer();
}
-
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
index 10ebcd966..61a5b2ab6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageProcessServlet.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.http;
+import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.dataproxy.consts.AttrConstants;
@@ -64,7 +65,7 @@ public class MessageProcessServlet extends HttpServlet {
if (logCounter.shouldPrint()) {
LOG.error("Received bad request from client. ", e);
}
- req.setAttribute("code", StatusCode.SERVICE_ERR);
+ req.setAttribute("code", DataProxyErrCode.UNKNOWN_ERROR.getErrCode());
}
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
deleted file mode 100644
index 88e411573..000000000
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/StatusCode.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.dataproxy.http;
-
-public interface StatusCode {
-
- /*
- * success
- */
- int SUCCESS = 1;
-
- /*
- * illegal argument
- */
- int ILLEGAL_ARGUMENT = -100;
-
- /*
- * exceed length
- */
- int EXCEED_LEN = -101;
-
- /*
- * service error
- */
- int SERVICE_ERR = -105;
-}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 52d690d76..6fd938030 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -305,13 +305,18 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
new HashSet<>(topicProperties.values()));
pulsarCluster = configManager.getMqClusterUrl2Token();
+ if (!ConfigManager.getInstance().isMqClusterReady()) {
+ this.canTake = true;
+ ConfigManager.getInstance().updMqClusterStatus(true);
+ logger.info("[{}] MQ Cluster service status ready!", getName());
+ }
}
@Override
public void start() {
logger.info("[{}] pulsar sink starting...", getName());
sinkCounter.start();
- pulsarClientService.initCreateConnection(this);
+ pulsarClientService.initCreateConnection(this, getName());
int statIntervalSec = pulsarConfig.getStatIntervalSec();
Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0");
@@ -345,7 +350,9 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
MetricRegister.register(metricItemSet);
- this.canTake = true;
+ if (ConfigManager.getInstance().isMqClusterReady()) {
+ this.canTake = true;
+ }
logger.info("[{}] Pulsar sink started", getName());
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 461c0ab49..dee798214 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -128,14 +128,16 @@ public class TubeSink extends AbstractSink implements Configurable {
tubeConfig = configManager.getMqClusterConfig();
topicProperties = configManager.getTopicProperties();
masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet();
- // start message deduplication handler
- MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
- tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
// only use first cluster address now
usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists);
// create producer holder
- producerHolder = new TubeProducerHolder(getName(),
- usedMasterAddr, configManager.getMqClusterConfig());
+ if (usedMasterAddr != null) {
+ producerHolder = new TubeProducerHolder(getName(),
+ usedMasterAddr, configManager.getMqClusterConfig());
+ }
+ // start message deduplication handler
+ MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(),
+ tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize());
// get statistic configure items
maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000);
statIntervalSec = tubeConfig.getStatIntervalSec();
@@ -196,17 +198,23 @@ public class TubeSink extends AbstractSink implements Configurable {
this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
MetricRegister.register(metricItemSet);
// create tube connection
- try {
- producerHolder.start(new HashSet<>(topicProperties.values()));
- } catch (FlumeException e) {
- logger.error("Unable to start TubeMQ client. Exception follows.", e);
- super.stop();
- return;
+ if (producerHolder != null) {
+ try {
+ producerHolder.start(new HashSet<>(topicProperties.values()));
+ ConfigManager.getInstance().updMqClusterStatus(true);
+ logger.info("[{}] MQ Cluster service status ready!", getName());
+ } catch (FlumeException e) {
+ logger.error("Unable to start TubeMQ client. Exception follows.", e);
+ super.stop();
+ return;
+ }
}
// start the cleaner thread
super.start();
this.canSend = true;
- this.canTake = true;
+ if (ConfigManager.getInstance().isMqClusterReady()) {
+ this.canTake = true;
+ }
for (int i = 0; i < sinkThreadPool.length; i++) {
sinkThreadPool[i] = new Thread(new TubeSinkTask(),
getName() + "_tube_sink_sender-" + i);
@@ -610,10 +618,12 @@ public class TubeSink extends AbstractSink implements Configurable {
}
// publish them
if (!addedTopics.isEmpty()) {
- try {
- producerHolder.createProducersByTopicSet(addedTopics);
- } catch (Exception e) {
- logger.info(getName() + "'s publish new topic set fail.", e);
+ if (producerHolder != null) {
+ try {
+ producerHolder.createProducersByTopicSet(addedTopics);
+ } catch (Exception e) {
+ logger.info(getName() + "'s publish new topic set fail.", e);
+ }
}
logger.info(getName() + "'s topics set has changed, trigger diff publish for {}",
addedTopics);
@@ -658,7 +668,17 @@ public class TubeSink extends AbstractSink implements Configurable {
producerHolder = newProducerHolder;
usedMasterAddr = newMasterAddr;
// close old producer holder
- tmpProducerHolder.stop();
+ if (tmpProducerHolder == null) {
+ diffSetPublish(new HashSet<>(),
+ new HashSet<>(configManager.getTopicProperties().values()));
+ } else {
+ tmpProducerHolder.stop();
+ }
+ if (!ConfigManager.getInstance().isMqClusterReady()) {
+ this.canTake = true;
+ ConfigManager.getInstance().updMqClusterStatus(true);
+ logger.info("[{}] MQ Cluster service status ready!", getName());
+ }
logger.info(getName() + " switch cluster from "
+ tmpMasterAddr + " to " + usedMasterAddr);
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
index c1852fed3..04b43c87f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java
@@ -211,7 +211,7 @@ public class TubeProducerHolder {
*
* @param cfgTopicSet the configured topic set
*/
- public void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception {
+ public synchronized void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception {
if (cfgTopicSet == null || cfgTopicSet.isEmpty()) {
return;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index cca008c19..13d58c4fc 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -116,7 +116,7 @@ public class PulsarClientService {
topicSendIndexMap = new ConcurrentHashMap<>();
}
- public void initCreateConnection(CreatePulsarClientCallBack callBack) {
+ public void initCreateConnection(CreatePulsarClientCallBack callBack, String sinkName) {
pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
logger.warn("failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
@@ -124,6 +124,10 @@ public class PulsarClientService {
}
try {
createConnection(callBack);
+ if (!ConfigManager.getInstance().isMqClusterReady()) {
+ ConfigManager.getInstance().updMqClusterStatus(true);
+ logger.info("[{}] MQ Cluster service status ready!", sinkName);
+ }
} catch (FlumeException e) {
logger.error("unable to create pulsar client: ", e);
close();
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 01767ffb9..7c3636fe1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -284,6 +284,14 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
if (commonAttrMap == null) {
commonAttrMap = new HashMap<>();
}
+ // check whether extract data failure
+ String errCode = commonAttrMap.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+ if (!StringUtils.isEmpty(errCode)
+ && !DataProxyErrCode.SUCCESS.getErrCodeStr().equals(errCode)) {
+ MessageUtils.sourceReturnRspPackage(
+ commonAttrMap, resultMap, remoteChannel, msgType);
+ return;
+ }
// process heartbeat message
if (MsgType.MSG_HEARTBEAT.equals(msgType)
|| MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
@@ -310,7 +318,15 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
commonAttrMap, resultMap, remoteChannel, msgType);
return;
}
- // transfer message data
+ // check sink service status
+ if (!ConfigManager.getInstance().isMqClusterReady()) {
+ commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
+ DataProxyErrCode.SINK_SERVICE_UNREADY.getErrCodeStr());
+ MessageUtils.sourceReturnRspPackage(
+ commonAttrMap, resultMap, remoteChannel, msgType);
+ return;
+ }
+ // convert message data
Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
new HashMap<>(msgList.size());
if (!convertMsgList(msgList, commonAttrMap, messageMap, strRemoteIP)) {