You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/15 04:54:32 UTC
[inlong] branch master updated: [INLONG-5055][Dataproxy] Fix NPE error of Pulsar client (#5057)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4fe2319e4 [INLONG-5055][Dataproxy] Fix NPE error of Pulsar client (#5057)
4fe2319e4 is described below
commit 4fe2319e436acacc87e81e12213bd2665feb94a4
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Jul 15 12:54:27 2022 +0800
[INLONG-5055][Dataproxy] Fix NPE error of Pulsar client (#5057)
---
.../dataproxy/sink/pulsar/PulsarClientService.java | 175 +++++++++------------
1 file changed, 75 insertions(+), 100 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index cef15ad0a..5683c8035 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -58,14 +58,16 @@ import java.util.concurrent.atomic.AtomicLong;
public class PulsarClientService {
private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
-
+ public Map<String, List<TopicProducerInfo>> producerInfoMap;
+ public Map<String, AtomicLong> topicSendIndexMap;
+ public Map<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
+ public int pulsarClientIoThreads;
+ public int pulsarConnectionsPreBroker;
/*
* for pulsar client
*/
private Map<String, String> pulsarUrl2token;
-
private String authType;
-
/*
* for producer
*/
@@ -83,12 +85,6 @@ public class PulsarClientService {
private int maxBatchingMessages = 1000;
private long maxBatchingPublishDelayMillis = 1;
private long retryIntervalWhenSendMsgError = 30 * 1000L;
- public Map<String, List<TopicProducerInfo>> producerInfoMap;
- public Map<String, AtomicLong> topicSendIndexMap;
- public Map<String, PulsarClient> pulsarClients;
-
- public int pulsarClientIoThreads;
- public int pulsarConnectionsPreBroker;
private String localIp = "127.0.0.1";
private StreamConfigLogMetric streamConfigLogMetric;
@@ -137,13 +133,13 @@ public class PulsarClientService {
public void initCreateConnection(CreatePulsarClientCallBack callBack) {
if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
- logger.warn("Failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
+ logger.warn("failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
return;
}
try {
createConnection(callBack);
} catch (FlumeException e) {
- logger.error("Unable to create pulsar client" + ". Exception follows.", e);
+ logger.error("unable to create pulsar client: ", e);
close();
}
}
@@ -152,7 +148,7 @@ public class PulsarClientService {
* send message
*/
public boolean sendMessage(int poolIndex, String topic, Event event,
- SendMessageCallBack sendMessageCallBack, EventStat es) {
+ SendMessageCallBack sendMessageCallBack, EventStat es) {
TopicProducerInfo producerInfo = null;
boolean result = false;
final String inlongStreamId = getInlongStreamId(event);
@@ -161,7 +157,7 @@ public class PulsarClientService {
producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId);
} catch (Exception e) {
producerInfo = null;
- logger.error("Get producer failed! topic = {}", topic, e);
+ logger.error("get producer failed! topic = " + topic, e);
if (streamConfigLogMetric != null) {
streamConfigLogMetric.updateConfigLog(inlongGroupId,
inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
@@ -180,8 +176,7 @@ public class PulsarClientService {
* put it back into the illegal map
*/
checkAndResponse(event, inlongGroupId, inlongStreamId);
- sendMessageCallBack.handleMessageSendException(topic, es,
- new NotFoundException("producer info is null"));
+ sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer info is null"));
return true;
}
@@ -192,10 +187,9 @@ public class PulsarClientService {
TopicProducerInfo forCallBackP = producerInfo;
Producer producer = producerInfo.getProducer(poolIndex);
if (producer == null) {
- logger.warn("Get producer is null! topic = {}", topic);
+ logger.warn("get producer is null! topic = {}", topic);
checkAndResponse(event, inlongGroupId, inlongStreamId);
- sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is "
- + "null"));
+ sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is null"));
return true;
}
if (es.isOrderMessage()) {
@@ -224,26 +218,26 @@ public class PulsarClientService {
/*
* avoid client timeout
*/
- logger.debug("es.getRetryCnt() {}", es.getRetryCnt());
+ logger.debug("es.getRetryCnt() = {}", es.getRetryCnt());
if (es.getRetryCnt() == 0 || es.getRetryCnt() == 1) {
- sendResponse((OrderEvent)event, inlongGroupId, inlongStreamId);
+ sendResponse((OrderEvent) event, inlongGroupId, inlongStreamId);
}
} else {
producer.newMessage().properties(proMap).value(event.getBody())
.sendAsync().thenAccept((msgId) -> {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- forCallBackP.setCanUseSend(true);
- sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
- }).exceptionally((e) -> {
- if (streamConfigLogMetric != null) {
- streamConfigLogMetric.updateConfigLog(inlongGroupId,
- inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
- ConfigLogTypeEnum.ERROR, e.toString());
- }
- forCallBackP.setCanUseSend(false);
- sendMessageCallBack.handleMessageSendException(topic, es, e);
- return null;
- });
+ AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+ forCallBackP.setCanUseSend(true);
+ sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
+ }).exceptionally((e) -> {
+ if (streamConfigLogMetric != null) {
+ streamConfigLogMetric.updateConfigLog(inlongGroupId,
+ inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+ ConfigLogTypeEnum.ERROR, e.toString());
+ }
+ forCallBackP.setCanUseSend(false);
+ sendMessageCallBack.handleMessageSendException(topic, es, e);
+ return null;
+ });
result = true;
}
return result;
@@ -251,31 +245,31 @@ public class PulsarClientService {
private void checkAndResponse(Event event, String inlongGroupId, String inlongStreamId) {
if (MessageUtils.isSyncSendForOrder(event) && (event instanceof OrderEvent)) {
- sendResponse((OrderEvent)event, inlongGroupId, inlongStreamId);
+ sendResponse((OrderEvent) event, inlongGroupId, inlongStreamId);
}
}
/**
* send Response
+ *
* @param orderEvent orderEvent
*/
private void sendResponse(OrderEvent orderEvent, String inlongGroupId, String inlongStreamId) {
String sequenceId = orderEvent.getHeaders().get(AttributeConstants.UNIQ_ID);
if ("false".equals(orderEvent.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
if (logger.isDebugEnabled()) {
- logger.debug("Not need to rsp message: seqId = {}, inlongGroupId = {}, "
- + "inlongStreamId = {}",sequenceId, inlongGroupId, inlongStreamId);
+ logger.debug("not need to rsp message: seqId = {}, inlongGroupId = {}, inlongStreamId = {}",
+ sequenceId, inlongGroupId, inlongStreamId);
}
return;
}
if (orderEvent.getCtx() != null && orderEvent.getCtx().channel().isActive()) {
orderEvent.getCtx().channel().eventLoop().execute(() -> {
if (logger.isDebugEnabled()) {
- logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, "
- + "inlongStreamId = {}",sequenceId, inlongGroupId, inlongStreamId);
+ logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
+ inlongGroupId, inlongStreamId);
}
- ByteBuf binBuffer = MessageUtils.getResponsePackage("",
- MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+ ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
orderEvent.getCtx().writeAndFlush(binBuffer);
});
}
@@ -288,10 +282,9 @@ public class PulsarClientService {
* @throws FlumeException if an RPC client connection could not be opened
*/
private void createConnection(CreatePulsarClientCallBack callBack) throws FlumeException {
- if (pulsarClients != null) {
+ if (!pulsarClients.isEmpty()) {
return;
}
- pulsarClients = new ConcurrentHashMap<>();
pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
logger.debug("number of pulsar cluster is {}", pulsarUrl2token.size());
for (Map.Entry<String, String> info : pulsarUrl2token.entrySet()) {
@@ -311,21 +304,16 @@ public class PulsarClientService {
ConfigLogTypeEnum.ERROR, e.toString());
}
logger.error("create connection error in Pulsar sink, "
- + "maybe pulsar master set error, please re-check.url{}, ex1 {}",
- info.getKey(),
- e.getMessage());
+ + "maybe pulsar master set error, please re-check. url " + info.getKey(), e);
} catch (Throwable e) {
callBack.handleCreateClientException(info.getKey());
logger.error("create connection error in pulsar sink, "
- + "maybe pulsar master set error/shutdown in progress, please "
- + "re-check. url{}, ex2 {}",
- info.getKey(),
- e.getMessage());
+ + "maybe pulsar master set error/shutdown in progress, please "
+ + "re-check. url " + info.getKey(), e);
}
}
- if (pulsarClients.size() == 0) {
- throw new FlumeException("connect to pulsar error1, "
- + "maybe zkstr/zkroot set error, please re-check");
+ if (pulsarClients.isEmpty()) {
+ throw new FlumeException("connect to pulsar error, maybe zkstr/zkroot set error, please re-check");
}
}
@@ -347,21 +335,18 @@ public class PulsarClientService {
public List<TopicProducerInfo> initTopicProducer(String topic, String inlongGroupId,
String inlongStreamId) {
List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
- List<TopicProducerInfo> newList = null;
- if (pulsarClients != null) {
- newList = new ArrayList<>();
- for (PulsarClient pulsarClient : pulsarClients.values()) {
- TopicProducerInfo info = new TopicProducerInfo(pulsarClient,
- sinkThreadPoolSize, topic);
- info.initProducer(inlongGroupId, inlongStreamId);
- if (info.isCanUseToSendMessage()) {
- newList.add(info);
- }
- }
- if (newList.size() == 0) {
- newList = null;
+ List<TopicProducerInfo> newList = new ArrayList<>();
+ for (PulsarClient pulsarClient : pulsarClients.values()) {
+ TopicProducerInfo info = new TopicProducerInfo(pulsarClient,
+ sinkThreadPoolSize, topic);
+ info.initProducer(inlongGroupId, inlongStreamId);
+ if (info.isCanUseToSendMessage()) {
+ newList.add(info);
}
}
+ if (newList.size() == 0) {
+ newList = null;
+ }
return newList;
});
return producerInfoList;
@@ -375,9 +360,7 @@ public class PulsarClientService {
String inlongStreamId) {
List<TopicProducerInfo> producerList =
initTopicProducer(topic, inlongGroupId, inlongStreamId);
- AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> {
- return new AtomicLong(0);
- });
+ AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> new AtomicLong(0));
int maxTryToGetProducer = producerList == null ? 0 : producerList.size();
if (maxTryToGetProducer == 0) {
return null;
@@ -402,19 +385,14 @@ public class PulsarClientService {
private void destroyConnection() {
producerInfoMap.clear();
- if (pulsarClients != null) {
- for (PulsarClient pulsarClient : pulsarClients.values()) {
- try {
- pulsarClient.shutdown();
- } catch (PulsarClientException e) {
- logger.error("destroy pulsarClient error in PulsarSink, PulsarClientException {}",
- e.getMessage());
- } catch (Exception e) {
- logger.error("destroy pulsarClient error in PulsarSink, ex {}", e.getMessage());
- }
+ for (PulsarClient pulsarClient : pulsarClients.values()) {
+ try {
+ pulsarClient.shutdown();
+ } catch (Exception e) {
+ logger.error("destroy pulsarClient error in PulsarSink: ", e);
}
}
- pulsarClients = null;
+ pulsarClients.clear();
logger.debug("closed meta producer");
}
@@ -435,10 +413,10 @@ public class PulsarClientService {
* @param callBack
* @param needToClose url-token map
* @param needToStart url-token map
- * @param topicSet for new pulsarClient, create these topics' producers
+ * @param topicSet for new pulsarClient, create these topics' producers
*/
public void updatePulsarClients(CreatePulsarClientCallBack callBack, Map<String, String> needToClose,
- Map<String, String> needToStart, Set<String> topicSet) {
+ Map<String, String> needToStart, Set<String> topicSet) {
// close
for (String url : needToClose.keySet()) {
PulsarClient pulsarClient = pulsarClients.get(url);
@@ -447,15 +425,11 @@ public class PulsarClientService {
removeProducers(pulsarClient);
pulsarClient.shutdown();
pulsarClients.remove(url);
- } catch (PulsarClientException e) {
- logger.error("shutdown pulsarClient error in PulsarSink, PulsarClientException {}",
- e.getMessage());
} catch (Exception e) {
- logger.error("shutdown pulsarClient error in PulsarSink, ex {}", e.getMessage());
+ logger.error("shutdown pulsarClient error in PulsarSink: ", e);
}
}
}
- // new pulsarClient
for (Map.Entry<String, String> entry : needToStart.entrySet()) {
String url = entry.getKey();
String token = entry.getValue();
@@ -479,19 +453,20 @@ public class PulsarClientService {
} catch (PulsarClientException e) {
callBack.handleCreateClientException(url);
- logger.error("create connnection error in pulsarsink, "
- + "maybe pulsar master set error, please re-check.url{}, ex1 {}", url, e.getMessage());
+ logger.error("create connnection error in pulsar sink, "
+ + "maybe pulsar master set error, please re-check.url " + url, e);
} catch (Throwable e) {
callBack.handleCreateClientException(url);
- logger.error("create connnection error in pulsarsink, "
+ logger.error("create connnection error in pulsar sink, "
+ "maybe pulsar master set error/shutdown in progress, please "
- + "re-check. url{}, ex2 {}", url, e.getMessage());
+ + "re-check. url " + url, e);
}
}
}
/**
* get inlong stream id from event
+ *
* @param event event
* @return inlong stream id
*/
@@ -506,7 +481,8 @@ public class PulsarClientService {
}
/**
- * get inlong group id from event
+ * get inlong group id from event
+ *
* @param event event
* @return inlong group id
*/
@@ -519,6 +495,7 @@ public class PulsarClientService {
}
class TopicProducerInfo {
+
private long lastSendMsgErrorTime;
private Producer[] producers;
@@ -533,8 +510,7 @@ public class PulsarClientService {
private volatile Boolean isFinishInit = false;
- public TopicProducerInfo(PulsarClient pulsarClient, int sinkThreadPoolSize,
- String topic) {
+ public TopicProducerInfo(PulsarClient pulsarClient, int sinkThreadPoolSize, String topic) {
this.pulsarClient = pulsarClient;
this.sinkThreadPoolSize = sinkThreadPoolSize;
this.topic = topic;
@@ -552,8 +528,8 @@ public class PulsarClientService {
}
isFinishInit = true;
} catch (PulsarClientException e) {
- logger.error("create pulsar client has error , topic = {}, inlongGroupId"
- + " = {}, inlongStreamId= {}", topic, inlongGroupId, inlongStreamId, e);
+ logger.error("create pulsar client has error, topic = {}, inlongGroupId = {}, inlongStreamId= {}",
+ topic, inlongGroupId, inlongStreamId, e);
isFinishInit = false;
for (int i = 0; i < sinkThreadPoolSize; i++) {
if (producers[i] != null) {
@@ -571,8 +547,7 @@ public class PulsarClientService {
}
private Producer createProducer() throws PulsarClientException {
- return pulsarClient.newProducer().sendTimeout(sendTimeout,
- TimeUnit.MILLISECONDS)
+ return pulsarClient.newProducer().sendTimeout(sendTimeout, TimeUnit.MILLISECONDS)
.topic(topic)
.enableBatching(enableBatch)
.blockIfQueueFull(blockIfQueueFull)
@@ -595,8 +570,8 @@ public class PulsarClientService {
public boolean isCanUseToSendMessage() {
if (isCanUseSend && isFinishInit) {
return true;
- } else if (isFinishInit && (System.currentTimeMillis() - lastSendMsgErrorTime)
- > retryIntervalWhenSendMsgError) {
+ } else if (isFinishInit
+ && (System.currentTimeMillis() - lastSendMsgErrorTime) > retryIntervalWhenSendMsgError) {
lastSendMsgErrorTime = System.currentTimeMillis();
return true;
}
@@ -611,7 +586,7 @@ public class PulsarClientService {
}
}
} catch (PulsarClientException e) {
- logger.error("close pulsar producer has error e = {}", e);
+ logger.error("close pulsar producer has error: ", e);
}
}