You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/18 06:04:20 UTC
[incubator-inlong] branch master updated: [INLONG-3196][DataProxy] Support to produce message to different topic in multiple Pulsar Cache (#3197)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f969f58 [INLONG-3196][DataProxy] Support to produce message to different topic in multiple Pulsar Cache (#3197)
f969f58 is described below
commit f969f585606b2719f1f6e8951d3811a63b65eca2
Author: 卢春亮 <94...@qq.com>
AuthorDate: Fri Mar 18 14:04:11 2022 +0800
[INLONG-3196][DataProxy] Support to produce message to different topic in multiple Pulsar Cache (#3197)
---
.../config/holder/IdTopicConfigHolder.java | 1 +
.../dataproxy/config/pojo/IdTopicConfig.java | 12 +++++++++
.../sink/pulsarzone/PulsarClusterProducer.java | 31 ++++++++++++++--------
3 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
index 9c52361..019536d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
@@ -129,6 +129,7 @@ public class IdTopicConfigHolder implements Configurable {
Map<String, IdTopicConfig> newConfigMap = new ConcurrentHashMap<>();
for (IdTopicConfig config : newConfigList) {
newConfigMap.put(config.getUid(), config);
+ config.formatTopicName();
}
this.configList = newConfigList;
this.configMap = newConfigMap;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index c8a1fa3..153843e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -175,4 +175,16 @@ public class IdTopicConfig {
this.fileDelimiter = fileDelimiter;
}
+ /**
+ * formatTopicName<br>
+ * change full topic name "pulsar-9xn9wp35pbxb/test/atta_topic_1" to base topic name "atta_topic_1"<br>
+ */
+ public void formatTopicName() {
+ if (this.topicName != null) {
+ int index = this.topicName.lastIndexOf('/');
+ if (index >= 0) {
+ this.topicName = this.topicName.substring(index + 1);
+ }
+ }
+ }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
index 449a92a..835a863 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsarzone/PulsarClusterProducer.java
@@ -55,6 +55,9 @@ public class PulsarClusterProducer implements LifecycleAware {
public static final Logger LOG = LoggerFactory.getLogger(PulsarClusterProducer.class);
+ public static final String KEY_TENANT = "tenant";
+ public static final String KEY_NAMESPACE = "namespace";
+
public static final String KEY_SERVICE_URL = "serviceUrl";
public static final String KEY_AUTHENTICATION = "authentication";
@@ -81,6 +84,9 @@ public class PulsarClusterProducer implements LifecycleAware {
private final String cacheClusterName;
private LifecycleState state;
+ private String tenant;
+ private String namespace;
+
/**
* pulsar client
*/
@@ -103,6 +109,8 @@ public class PulsarClusterProducer implements LifecycleAware {
this.context = context.getProducerContext();
this.state = LifecycleState.IDLE;
this.cacheClusterName = config.getClusterName();
+ this.tenant = config.getParams().getOrDefault(KEY_TENANT, "pulsar");
+ this.namespace = config.getParams().getOrDefault(KEY_NAMESPACE, "inlong");
}
/**
@@ -211,26 +219,27 @@ public class PulsarClusterProducer implements LifecycleAware {
public boolean send(DispatchProfile event) {
try {
// topic
- String topic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
- if (topic == null) {
+ String baseTopic = sinkContext.getIdTopicHolder().getTopic(event.getUid());
+ if (baseTopic == null) {
sinkContext.addSendResultMetric(event, event.getUid(), false, 0);
return false;
}
// get producer
- Producer<byte[]> producer = this.producerMap.get(topic);
+ String producerTopic = tenant + '/' + namespace + '/' + baseTopic;
+ Producer<byte[]> producer = this.producerMap.get(producerTopic);
if (producer == null) {
try {
- LOG.info("try to new a object for topic " + topic);
+ LOG.info("try to new a object for topic " + producerTopic);
SecureRandom secureRandom = new SecureRandom(
- (workerName + "-" + cacheClusterName + "-" + topic + System.currentTimeMillis())
+ (workerName + "-" + cacheClusterName + "-" + producerTopic + System.currentTimeMillis())
.getBytes());
- String producerName = workerName + "-" + cacheClusterName + "-" + topic + "-"
+ String producerName = workerName + "-" + cacheClusterName + "-" + producerTopic + "-"
+ secureRandom.nextLong();
- producer = baseBuilder.clone().topic(topic)
+ producer = baseBuilder.clone().topic(producerTopic)
.producerName(producerName)
.create();
LOG.info("create new producer success:{}", producer.getProducerName());
- Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
+ Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(producerTopic, producer);
if (oldProducer != null) {
producer.close();
LOG.info("close producer success:{}", producer.getProducerName());
@@ -243,7 +252,7 @@ public class PulsarClusterProducer implements LifecycleAware {
// create producer failed
if (producer == null) {
sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, topic, false, 0);
+ sinkContext.addSendResultMetric(event, producerTopic, false, 0);
return false;
}
// headers
@@ -260,9 +269,9 @@ public class PulsarClusterProducer implements LifecycleAware {
LOG.error("Send fail:{}", ex.getMessage());
LOG.error(ex.getMessage(), ex);
sinkContext.getDispatchQueue().offer(event);
- sinkContext.addSendResultMetric(event, topic, false, sendTime);
+ sinkContext.addSendResultMetric(event, producerTopic, false, sendTime);
} else {
- sinkContext.addSendResultMetric(event, topic, true, sendTime);
+ sinkContext.addSendResultMetric(event, producerTopic, true, sendTime);
}
});
return true;