You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/02/24 04:16:14 UTC

[GitHub] [incubator-inlong] pocozh commented on a change in pull request #2670: [INLONG-2568][DataProxy] Support dynamically getting TubeMq config from Manager

pocozh commented on a change in pull request #2670:
URL: https://github.com/apache/incubator-inlong/pull/2670#discussion_r813532439



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
##########
@@ -299,6 +300,65 @@ private void destroyConnection() {
         logger.debug("closed meta producer");
     }
 
+    /**
+     * close pulsarClients(the related url is removed); start pulsarClients for new url, and create producers for them
+     *
+     * @param callBack
+     * @param needToClose url-token map
+     * @param needToStart url-token map
+     * @param topicSet    for new pulsarClient, create these topics' producers
+     */
+    public void updatePulsarClients(CreatePulsarClientCallBack callBack, Map<String, String> needToClose,
+                                    Map<String, String> needToStart, Set<String> topicSet) {
+        // close
+        for (String url : needToClose.keySet()) {
+            PulsarClient pulsarClient = pulsarClients.get(url);
+            if (pulsarClient != null) {
+                try {
+                    pulsarClient.shutdown();
+                    pulsarClients.remove(url);
+                } catch (PulsarClientException e) {
+                    logger.error("shutdown pulsarClient error in PulsarSink, PulsarClientException {}",
+                            e.getMessage());
+                } catch (Exception e) {
+                    logger.error("shutdown pulsarClient error in PulsarSink, ex {}", e.getMessage());
+                }
+            }
+        }
+        // new pulsarClient
+        for (Map.Entry<String, String> entry : needToStart.entrySet()) {
+            String url = entry.getKey();
+            String token = entry.getValue();
+            try {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("url = {}, token = {}", url, token);
+                }
+                PulsarClient client = initPulsarClient(url, token);
+                pulsarClients.put(url, client);
+                callBack.handleCreateClientSuccess(url);

Review comment:
       In sendMessage method, dispatching message to topic producer is based on producerInfoMap(topic->producer), instead of pulsarClients. So, if a producer is not initialized, it will not be allocated to send message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org