You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/18 06:04:20 UTC

[incubator-inlong] branch master updated: [INLONG-3196][DataProxy] Support to produce message to different topic in multiple Pulsar Cache (#3197)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f969f58  [INLONG-3196][DataProxy] Support to produce message to different topic in multiple Pulsar Cache (#3197)
f969f58 is described below

commit f969f585606b2719f1f6e8951d3811a63b65eca2
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Mar 18 14:04:11 2022 +0800

    [INLONG-3196][DataProxy] Support to produce message to different topic in multiple Pulsar Cache (#3197)
---
 .../config/holder/IdTopicConfigHolder.java         |  1 +
 .../dataproxy/config/pojo/IdTopicConfig.java       | 12 +++++++++
 .../sink/pulsarzone/PulsarClusterProducer.java     | 31 ++++++++++++++--------
 3 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
index 9c52361..019536d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
@@ -129,6 +129,7 @@ public class IdTopicConfigHolder implements Configurable {
             Map<String, IdTopicConfig> newConfigMap = new ConcurrentHashMap<>();
             for (IdTopicConfig config : newConfigList) {
                 newConfigMap.put(config.getUid(), config);
+                config.formatTopicName();
             }
             this.configList = newConfigList;
             this.configMap = newConfigMap;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index c8a1fa3..153843e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -175,4 +175,16 @@ public class IdTopicConfig {
         this.fileDelimiter = fileDelimiter;
     }
 
+    /**
+     * formatTopicName<br>
+     * change full topic name "pulsar-9xn9wp35pbxb/test/atta_topic_1" to base topic name "atta_topic_1"<br>
+     */
+    public void formatTopicName() {
+        if (this.topicName != null) {
+            int index = this.topicName.lastIndexOf('/');
+            if (index >= 0) {
+                this.topicName = this.topicName.substring(index + 1);
+            }
+        }
+    }
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
index 449a92a..835a863 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -55,6 +55,9 @@ public class PulsarClusterProducer implements LifecycleAware {
 
     public static final Logger LOG = LoggerFactory.getLogger(PulsarClusterProducer.class);
 
+    public static final String KEY_TENANT = "tenant";
+    public static final String KEY_NAMESPACE = "namespace";
+
     public static final String KEY_SERVICE_URL = "serviceUrl";
     public static final String KEY_AUTHENTICATION = "authentication";
 
@@ -81,6 +84,9 @@ public class PulsarClusterProducer implements LifecycleAware {
     private final String cacheClusterName;
     private LifecycleState state;
 
+    private String tenant;
+    private String namespace;
+
     /**
      * pulsar client
      */
@@ -103,6 +109,8 @@ public class PulsarClusterProducer implements LifecycleAware {
         this.context = context.getProducerContext();
         this.state = LifecycleState.IDLE;
         this.cacheClusterName = config.getClusterName();
+        this.tenant = config.getParams().getOrDefault(KEY_TENANT, "pulsar");
+        this.namespace = config.getParams().getOrDefault(KEY_NAMESPACE, "inlong");
     }
 
     /**
@@ -211,26 +219,27 @@ public class PulsarClusterProducer implements LifecycleAware {
     public boolean send(DispatchProfile event) {
         try {
             // topic
-            String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
-            if (topic == null) {
+            String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
+            if (baseTopic == null) {
                 sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
                 return false;
             }
             // get producer
-            Producer<byte[]> producer = this.producerMap.get(topic);
+            String producerTopic = tenant + '/' + namespace + '/' + baseTopic;
+            Producer<byte[]> producer = this.producerMap.get(producerTopic);
             if (producer == null) {
                 try {
-                    LOG.info("try to new a object for topic " + topic);
+                    LOG.info("try to new a object for topic " + producerTopic);
                     SecureRandom secureRandom = new SecureRandom(
-                            (workerName + "-" + cacheClusterName + "-" + topic + System.currentTimeMillis())
+                            (workerName + "-" + cacheClusterName + "-" + producerTopic + System.currentTimeMillis())
                                     .getBytes());
-                    String producerName = workerName + "-" + cacheClusterName + "-" + topic + "-"
+                    String producerName = workerName + "-" + cacheClusterName + "-" + producerTopic + "-"
                             + secureRandom.nextLong();
-                    producer = baseBuilder.clone().topic(topic)
+                    producer = baseBuilder.clone().topic(producerTopic)
                             .producerName(producerName)
                             .create();
                     LOG.info("create new producer success:{}", producer.getProducerName());
-                    Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
+                    Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(producerTopic, producer);
                     if (oldProducer != null) {
                         producer.close();
                         LOG.info("close producer success:{}", producer.getProducerName());
@@ -243,7 +252,7 @@ public class PulsarClusterProducer implements LifecycleAware {
             // create producer failed
             if (producer == null) {
                 sinkContext.getDispatchQueue().offer(event);
-                sinkContext.addSendResultMetric(event, topic, false, 0);
+                sinkContext.addSendResultMetric(event, producerTopic, false, 0);
                 return false;
             }
             // headers
@@ -260,9 +269,9 @@ public class PulsarClusterProducer implements LifecycleAware {
                     LOG.error("Send fail:{}", ex.getMessage());
                     LOG.error(ex.getMessage(), ex);
                     sinkContext.getDispatchQueue().offer(event);
-                    sinkContext.addSendResultMetric(event, topic, false, sendTime);
+                    sinkContext.addSendResultMetric(event, producerTopic, false, sendTime);
                 } else {
-                    sinkContext.addSendResultMetric(event, topic, true, sendTime);
+                    sinkContext.addSendResultMetric(event, producerTopic, true, sendTime);
                 }
             });
             return true;