You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/15 04:54:32 UTC

[inlong] branch master updated: [INLONG-5055][Dataproxy] Fix NPE error of Pulsar client (#5057)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fe2319e4 [INLONG-5055][Dataproxy] Fix NPE error of Pulsar client (#5057)
4fe2319e4 is described below

commit 4fe2319e436acacc87e81e12213bd2665feb94a4
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Jul 15 12:54:27 2022 +0800

    [INLONG-5055][Dataproxy] Fix NPE error of Pulsar client (#5057)
---
 .../dataproxy/sink/pulsar/PulsarClientService.java | 175 +++++++++------------
 1 file changed, 75 insertions(+), 100 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index cef15ad0a..5683c8035 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -58,14 +58,16 @@ import java.util.concurrent.atomic.AtomicLong;
 public class PulsarClientService {
 
     private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
-
+    public Map<String, List<TopicProducerInfo>> producerInfoMap;
+    public Map<String, AtomicLong> topicSendIndexMap;
+    public Map<String, PulsarClient> pulsarClients = new ConcurrentHashMap<>();
+    public int pulsarClientIoThreads;
+    public int pulsarConnectionsPreBroker;
     /*
      * for pulsar client
      */
     private Map<String, String> pulsarUrl2token;
-
     private String authType;
-
     /*
      * for producer
      */
@@ -83,12 +85,6 @@ public class PulsarClientService {
     private int maxBatchingMessages = 1000;
     private long maxBatchingPublishDelayMillis = 1;
     private long retryIntervalWhenSendMsgError = 30 * 1000L;
-    public Map<String, List<TopicProducerInfo>> producerInfoMap;
-    public Map<String, AtomicLong> topicSendIndexMap;
-    public Map<String, PulsarClient> pulsarClients;
-
-    public int pulsarClientIoThreads;
-    public int pulsarConnectionsPreBroker;
     private String localIp = "127.0.0.1";
 
     private StreamConfigLogMetric streamConfigLogMetric;
@@ -137,13 +133,13 @@ public class PulsarClientService {
 
     public void initCreateConnection(CreatePulsarClientCallBack callBack) {
         if (pulsarUrl2token == null || pulsarUrl2token.isEmpty()) {
-            logger.warn("Failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
+            logger.warn("failed to get Pulsar Cluster, make sure register pulsar to manager successfully.");
             return;
         }
         try {
             createConnection(callBack);
         } catch (FlumeException e) {
-            logger.error("Unable to create pulsar client" + ". Exception follows.", e);
+            logger.error("unable to create pulsar client: ", e);
             close();
         }
     }
@@ -152,7 +148,7 @@ public class PulsarClientService {
      * send message
      */
     public boolean sendMessage(int poolIndex, String topic, Event event,
-                               SendMessageCallBack sendMessageCallBack, EventStat es) {
+            SendMessageCallBack sendMessageCallBack, EventStat es) {
         TopicProducerInfo producerInfo = null;
         boolean result = false;
         final String inlongStreamId = getInlongStreamId(event);
@@ -161,7 +157,7 @@ public class PulsarClientService {
             producerInfo = getProducerInfo(poolIndex, topic, inlongGroupId, inlongStreamId);
         } catch (Exception e) {
             producerInfo = null;
-            logger.error("Get producer failed! topic = {}", topic, e);
+            logger.error("get producer failed! topic = " + topic, e);
             if (streamConfigLogMetric != null) {
                 streamConfigLogMetric.updateConfigLog(inlongGroupId,
                         inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
@@ -180,8 +176,7 @@ public class PulsarClientService {
              *  put it back into the illegal map
              */
             checkAndResponse(event, inlongGroupId, inlongStreamId);
-            sendMessageCallBack.handleMessageSendException(topic, es,
-                    new NotFoundException("producer info is null"));
+            sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer info is null"));
             return true;
         }
 
@@ -192,10 +187,9 @@ public class PulsarClientService {
         TopicProducerInfo forCallBackP = producerInfo;
         Producer producer = producerInfo.getProducer(poolIndex);
         if (producer == null) {
-            logger.warn("Get producer is null! topic = {}", topic);
+            logger.warn("get producer is null! topic = {}", topic);
             checkAndResponse(event, inlongGroupId, inlongStreamId);
-            sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is "
-                    + "null"));
+            sendMessageCallBack.handleMessageSendException(topic, es, new NotFoundException("producer is null"));
             return true;
         }
         if (es.isOrderMessage()) {
@@ -224,26 +218,26 @@ public class PulsarClientService {
             /*
              * avoid client timeout
              */
-            logger.debug("es.getRetryCnt() {}", es.getRetryCnt());
+            logger.debug("es.getRetryCnt() = {}", es.getRetryCnt());
             if (es.getRetryCnt() == 0 || es.getRetryCnt() == 1) {
-                sendResponse((OrderEvent)event, inlongGroupId, inlongStreamId);
+                sendResponse((OrderEvent) event, inlongGroupId, inlongStreamId);
             }
         } else {
             producer.newMessage().properties(proMap).value(event.getBody())
                     .sendAsync().thenAccept((msgId) -> {
-                AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
-                forCallBackP.setCanUseSend(true);
-                sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
-            }).exceptionally((e) -> {
-                if (streamConfigLogMetric != null) {
-                    streamConfigLogMetric.updateConfigLog(inlongGroupId,
-                            inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
-                            ConfigLogTypeEnum.ERROR, e.toString());
-                }
-                forCallBackP.setCanUseSend(false);
-                sendMessageCallBack.handleMessageSendException(topic, es, e);
-                return null;
-            });
+                        AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+                        forCallBackP.setCanUseSend(true);
+                        sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
+                    }).exceptionally((e) -> {
+                        if (streamConfigLogMetric != null) {
+                            streamConfigLogMetric.updateConfigLog(inlongGroupId,
+                                    inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+                                    ConfigLogTypeEnum.ERROR, e.toString());
+                        }
+                        forCallBackP.setCanUseSend(false);
+                        sendMessageCallBack.handleMessageSendException(topic, es, e);
+                        return null;
+                    });
             result = true;
         }
         return result;
@@ -251,31 +245,31 @@ public class PulsarClientService {
 
     private void checkAndResponse(Event event, String inlongGroupId, String inlongStreamId) {
         if (MessageUtils.isSyncSendForOrder(event) && (event instanceof OrderEvent)) {
-            sendResponse((OrderEvent)event, inlongGroupId, inlongStreamId);
+            sendResponse((OrderEvent) event, inlongGroupId, inlongStreamId);
         }
     }
 
     /**
      * send Response
+     *
      * @param orderEvent orderEvent
      */
     private void sendResponse(OrderEvent orderEvent, String inlongGroupId, String inlongStreamId) {
         String sequenceId = orderEvent.getHeaders().get(AttributeConstants.UNIQ_ID);
         if ("false".equals(orderEvent.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
             if (logger.isDebugEnabled()) {
-                logger.debug("Not need to rsp message: seqId = {}, inlongGroupId = {}, "
-                                + "inlongStreamId = {}",sequenceId, inlongGroupId, inlongStreamId);
+                logger.debug("not need to rsp message: seqId = {}, inlongGroupId = {}, inlongStreamId = {}",
+                        sequenceId, inlongGroupId, inlongStreamId);
             }
             return;
         }
         if (orderEvent.getCtx() != null && orderEvent.getCtx().channel().isActive()) {
             orderEvent.getCtx().channel().eventLoop().execute(() -> {
                 if (logger.isDebugEnabled()) {
-                    logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, "
-                            + "inlongStreamId = {}",sequenceId, inlongGroupId, inlongStreamId);
+                    logger.debug("order message rsp: seqId = {}, inlongGroupId = {}, inlongStreamId = {}", sequenceId,
+                            inlongGroupId, inlongStreamId);
                 }
-                ByteBuf binBuffer = MessageUtils.getResponsePackage("",
-                        MsgType.MSG_BIN_MULTI_BODY, sequenceId);
+                ByteBuf binBuffer = MessageUtils.getResponsePackage("", MsgType.MSG_BIN_MULTI_BODY, sequenceId);
                 orderEvent.getCtx().writeAndFlush(binBuffer);
             });
         }
@@ -288,10 +282,9 @@ public class PulsarClientService {
      * @throws FlumeException if an RPC client connection could not be opened
      */
     private void createConnection(CreatePulsarClientCallBack callBack) throws FlumeException {
-        if (pulsarClients != null) {
+        if (!pulsarClients.isEmpty()) {
             return;
         }
-        pulsarClients = new ConcurrentHashMap<>();
         pulsarUrl2token = ConfigManager.getInstance().getMqClusterUrl2Token();
         logger.debug("number of pulsar cluster is {}", pulsarUrl2token.size());
         for (Map.Entry<String, String> info : pulsarUrl2token.entrySet()) {
@@ -311,21 +304,16 @@ public class PulsarClientService {
                             ConfigLogTypeEnum.ERROR, e.toString());
                 }
                 logger.error("create connection error in Pulsar sink, "
-                                + "maybe pulsar master set error, please re-check.url{}, ex1 {}",
-                        info.getKey(),
-                        e.getMessage());
+                        + "maybe pulsar master set error, please re-check. url " + info.getKey(), e);
             } catch (Throwable e) {
                 callBack.handleCreateClientException(info.getKey());
                 logger.error("create connection error in pulsar sink, "
-                                + "maybe pulsar master set error/shutdown in progress, please "
-                                + "re-check. url{}, ex2 {}",
-                        info.getKey(),
-                        e.getMessage());
+                        + "maybe pulsar master set error/shutdown in progress, please "
+                        + "re-check. url " + info.getKey(), e);
             }
         }
-        if (pulsarClients.size() == 0) {
-            throw new FlumeException("connect to pulsar error1, "
-                    + "maybe zkstr/zkroot set error, please re-check");
+        if (pulsarClients.isEmpty()) {
+            throw new FlumeException("connect to pulsar error, maybe zkstr/zkroot set error, please re-check");
         }
     }
 
@@ -347,21 +335,18 @@ public class PulsarClientService {
     public List<TopicProducerInfo> initTopicProducer(String topic, String inlongGroupId,
             String inlongStreamId) {
         List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
-            List<TopicProducerInfo> newList = null;
-            if (pulsarClients != null) {
-                newList = new ArrayList<>();
-                for (PulsarClient pulsarClient : pulsarClients.values()) {
-                    TopicProducerInfo info = new TopicProducerInfo(pulsarClient,
-                            sinkThreadPoolSize, topic);
-                    info.initProducer(inlongGroupId, inlongStreamId);
-                    if (info.isCanUseToSendMessage()) {
-                        newList.add(info);
-                    }
-                }
-                if (newList.size() == 0) {
-                    newList = null;
+            List<TopicProducerInfo> newList = new ArrayList<>();
+            for (PulsarClient pulsarClient : pulsarClients.values()) {
+                TopicProducerInfo info = new TopicProducerInfo(pulsarClient,
+                        sinkThreadPoolSize, topic);
+                info.initProducer(inlongGroupId, inlongStreamId);
+                if (info.isCanUseToSendMessage()) {
+                    newList.add(info);
                 }
             }
+            if (newList.size() == 0) {
+                newList = null;
+            }
             return newList;
         });
         return producerInfoList;
@@ -375,9 +360,7 @@ public class PulsarClientService {
             String inlongStreamId) {
         List<TopicProducerInfo> producerList =
                 initTopicProducer(topic, inlongGroupId, inlongStreamId);
-        AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> {
-            return new AtomicLong(0);
-        });
+        AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> new AtomicLong(0));
         int maxTryToGetProducer = producerList == null ? 0 : producerList.size();
         if (maxTryToGetProducer == 0) {
             return null;
@@ -402,19 +385,14 @@ public class PulsarClientService {
 
     private void destroyConnection() {
         producerInfoMap.clear();
-        if (pulsarClients != null) {
-            for (PulsarClient pulsarClient : pulsarClients.values()) {
-                try {
-                    pulsarClient.shutdown();
-                } catch (PulsarClientException e) {
-                    logger.error("destroy pulsarClient error in PulsarSink, PulsarClientException {}",
-                            e.getMessage());
-                } catch (Exception e) {
-                    logger.error("destroy pulsarClient error in PulsarSink, ex {}", e.getMessage());
-                }
+        for (PulsarClient pulsarClient : pulsarClients.values()) {
+            try {
+                pulsarClient.shutdown();
+            } catch (Exception e) {
+                logger.error("destroy pulsarClient error in PulsarSink: ", e);
             }
         }
-        pulsarClients = null;
+        pulsarClients.clear();
         logger.debug("closed meta producer");
     }
 
@@ -435,10 +413,10 @@ public class PulsarClientService {
      * @param callBack
      * @param needToClose url-token map
      * @param needToStart url-token map
-     * @param topicSet    for new pulsarClient, create these topics' producers
+     * @param topicSet for new pulsarClient, create these topics' producers
      */
     public void updatePulsarClients(CreatePulsarClientCallBack callBack, Map<String, String> needToClose,
-                                    Map<String, String> needToStart, Set<String> topicSet) {
+            Map<String, String> needToStart, Set<String> topicSet) {
         // close
         for (String url : needToClose.keySet()) {
             PulsarClient pulsarClient = pulsarClients.get(url);
@@ -447,15 +425,11 @@ public class PulsarClientService {
                     removeProducers(pulsarClient);
                     pulsarClient.shutdown();
                     pulsarClients.remove(url);
-                } catch (PulsarClientException e) {
-                    logger.error("shutdown pulsarClient error in PulsarSink, PulsarClientException {}",
-                            e.getMessage());
                 } catch (Exception e) {
-                    logger.error("shutdown pulsarClient error in PulsarSink, ex {}", e.getMessage());
+                    logger.error("shutdown pulsarClient error in PulsarSink: ", e);
                 }
             }
         }
-        // new pulsarClient
         for (Map.Entry<String, String> entry : needToStart.entrySet()) {
             String url = entry.getKey();
             String token = entry.getValue();
@@ -479,19 +453,20 @@ public class PulsarClientService {
 
             } catch (PulsarClientException e) {
                 callBack.handleCreateClientException(url);
-                logger.error("create connnection error in pulsarsink, "
-                        + "maybe pulsar master set error, please re-check.url{}, ex1 {}", url, e.getMessage());
+                logger.error("create connnection error in pulsar sink, "
+                        + "maybe pulsar master set error, please re-check.url " + url, e);
             } catch (Throwable e) {
                 callBack.handleCreateClientException(url);
-                logger.error("create connnection error in pulsarsink, "
+                logger.error("create connnection error in pulsar sink, "
                         + "maybe pulsar master set error/shutdown in progress, please "
-                        + "re-check. url{}, ex2 {}", url, e.getMessage());
+                        + "re-check. url " + url, e);
             }
         }
     }
 
     /**
      * get inlong stream id from event
+     *
      * @param event event
      * @return inlong stream id
      */
@@ -506,7 +481,8 @@ public class PulsarClientService {
     }
 
     /**
-     *  get inlong group id from event
+     * get inlong group id from event
+     *
      * @param event event
      * @return inlong group id
      */
@@ -519,6 +495,7 @@ public class PulsarClientService {
     }
 
     class TopicProducerInfo {
+
         private long lastSendMsgErrorTime;
 
         private Producer[] producers;
@@ -533,8 +510,7 @@ public class PulsarClientService {
 
         private volatile Boolean isFinishInit = false;
 
-        public TopicProducerInfo(PulsarClient pulsarClient, int sinkThreadPoolSize,
-                                 String topic) {
+        public TopicProducerInfo(PulsarClient pulsarClient, int sinkThreadPoolSize, String topic) {
             this.pulsarClient = pulsarClient;
             this.sinkThreadPoolSize = sinkThreadPoolSize;
             this.topic = topic;
@@ -552,8 +528,8 @@ public class PulsarClientService {
                 }
                 isFinishInit = true;
             } catch (PulsarClientException e) {
-                logger.error("create pulsar client has error , topic = {}, inlongGroupId"
-                        + " = {}, inlongStreamId= {}", topic, inlongGroupId, inlongStreamId, e);
+                logger.error("create pulsar client has error, topic = {}, inlongGroupId = {}, inlongStreamId= {}",
+                        topic, inlongGroupId, inlongStreamId, e);
                 isFinishInit = false;
                 for (int i = 0; i < sinkThreadPoolSize; i++) {
                     if (producers[i] != null) {
@@ -571,8 +547,7 @@ public class PulsarClientService {
         }
 
         private Producer createProducer() throws PulsarClientException {
-            return pulsarClient.newProducer().sendTimeout(sendTimeout,
-                    TimeUnit.MILLISECONDS)
+            return pulsarClient.newProducer().sendTimeout(sendTimeout, TimeUnit.MILLISECONDS)
                     .topic(topic)
                     .enableBatching(enableBatch)
                     .blockIfQueueFull(blockIfQueueFull)
@@ -595,8 +570,8 @@ public class PulsarClientService {
         public boolean isCanUseToSendMessage() {
             if (isCanUseSend && isFinishInit) {
                 return true;
-            } else if (isFinishInit && (System.currentTimeMillis() - lastSendMsgErrorTime)
-                    > retryIntervalWhenSendMsgError) {
+            } else if (isFinishInit
+                    && (System.currentTimeMillis() - lastSendMsgErrorTime) > retryIntervalWhenSendMsgError) {
                 lastSendMsgErrorTime = System.currentTimeMillis();
                 return true;
             }
@@ -611,7 +586,7 @@ public class PulsarClientService {
                     }
                 }
             } catch (PulsarClientException e) {
-                logger.error("close pulsar producer has error e = {}", e);
+                logger.error("close pulsar producer has error: ", e);
             }
         }