You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2020/10/19 02:29:16 UTC
[pulsar-client-node] branch master updated: Add Multi-Topic Support
to Consumer (#130)
This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new f010fb2 Add Multi-Topic Support to Consumer (#130)
f010fb2 is described below
commit f010fb2ed3c530af8006ba3d3a7b73c6b46f6507
Author: savearray2 <46...@users.noreply.github.com>
AuthorDate: Mon Oct 19 11:29:06 2020 +0900
Add Multi-Topic Support to Consumer (#130)
* Add Multi-Topic Support to Consumer
Adds multi-topic support from the C++ library by utilizing pulsar_client_subscribe_multi_topics_async.
* Update c_topics to cTopics
Change from snake_case to camelCase
Co-authored-by: savearray2 <savearray2>
---
src/Consumer.cc | 19 +++++++++++----
src/ConsumerConfig.cc | 11 +++++++++
src/ConsumerConfig.h | 2 ++
tests/consumer.test.js | 24 ++++++++++++++++---
tests/end_to_end.test.js | 61 ++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 986fb9d..8e4a9e1 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -106,10 +106,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
~ConsumerNewInstanceWorker() {}
void Execute() {
const std::string &topic = this->consumerConfig->GetTopic();
+ const std::vector<std::string> &topics = this->consumerConfig->GetTopics();
const std::string &topicsPattern = this->consumerConfig->GetTopicsPattern();
- if (topic.empty() && topicsPattern.empty()) {
- SetError(std::string(
- "Topic or topicsPattern is required and must be specified as a string when creating consumer"));
+ if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
+ SetError(
+ std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
+ "creating consumer"));
return;
}
const std::string &subscription = this->consumerConfig->GetSubscription();
@@ -133,10 +135,19 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
}
this->done = false;
- if (topic.empty()) {
+ if (!topicsPattern.empty()) {
pulsar_client_subscribe_pattern_async(this->cClient, topicsPattern.c_str(), subscription.c_str(),
this->consumerConfig->GetCConsumerConfig(),
&ConsumerNewInstanceWorker::subscribeCallback, (void *)this);
+ } else if (topics.size() > 0) {
+ const char **cTopics = new const char *[topics.size()];
+ for (size_t i = 0; i < topics.size(); i++) {
+ cTopics[i] = topics[i].c_str();
+ }
+ pulsar_client_subscribe_multi_topics_async(this->cClient, cTopics, topics.size(), subscription.c_str(),
+ this->consumerConfig->GetCConsumerConfig(),
+ &ConsumerNewInstanceWorker::subscribeCallback, (void *)this);
+ delete cTopics;
} else {
pulsar_client_subscribe_async(this->cClient, topic.c_str(), subscription.c_str(),
this->consumerConfig->GetCConsumerConfig(),
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index aff3736..939986d 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -25,6 +25,7 @@
#include <map>
static const std::string CFG_TOPIC = "topic";
+static const std::string CFG_TOPICS = "topics";
static const std::string CFG_TOPICS_PATTERN = "topicsPattern";
static const std::string CFG_SUBSCRIPTION = "subscription";
static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
@@ -64,6 +65,15 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
}
+ if (consumerConfig.Has(CFG_TOPICS) && consumerConfig.Get(CFG_TOPICS).IsArray()) {
+ auto arr = consumerConfig.Get(CFG_TOPICS).As<Napi::Array>();
+ for (uint32_t i = 0; i < arr.Length(); i++) {
+ if (arr.Get(i).IsString()) {
+ this->topics.emplace_back(arr.Get(i).ToString().Utf8Value());
+ }
+ }
+ }
+
if (consumerConfig.Has(CFG_TOPICS_PATTERN) && consumerConfig.Get(CFG_TOPICS_PATTERN).IsString()) {
this->topicsPattern = consumerConfig.Get(CFG_TOPICS_PATTERN).ToString().Utf8Value();
}
@@ -165,6 +175,7 @@ ConsumerConfig::~ConsumerConfig() {
pulsar_consumer_configuration_t *ConsumerConfig::GetCConsumerConfig() { return this->cConsumerConfig; }
std::string ConsumerConfig::GetTopic() { return this->topic; }
+std::vector<std::string> ConsumerConfig::GetTopics() { return this->topics; }
std::string ConsumerConfig::GetTopicsPattern() { return this->topicsPattern; }
std::string ConsumerConfig::GetSubscription() { return this->subscription; }
ListenerCallback *ConsumerConfig::GetListenerCallback() {
diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h
index d3ef01f..7a434fe 100644
--- a/src/ConsumerConfig.h
+++ b/src/ConsumerConfig.h
@@ -32,6 +32,7 @@ class ConsumerConfig {
~ConsumerConfig();
pulsar_consumer_configuration_t *GetCConsumerConfig();
std::string GetTopic();
+ std::vector<std::string> GetTopics();
std::string GetTopicsPattern();
std::string GetSubscription();
int64_t GetAckTimeoutMs();
@@ -42,6 +43,7 @@ class ConsumerConfig {
private:
pulsar_consumer_configuration_t *cConsumerConfig;
std::string topic;
+ std::vector<std::string> topics;
std::string topicsPattern;
std::string subscription;
int64_t ackTimeoutMs;
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index f4fede5..c152989 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -31,7 +31,7 @@ const Pulsar = require('../index.js');
subscription: 'sub1',
ackTimeoutMs: 10000,
nAckRedeliverTimeoutMs: 60000,
- })).rejects.toThrow('Topic or topicsPattern is required and must be specified as a string when creating consumer');
+ })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
});
test('Not String Topic', async () => {
@@ -40,7 +40,7 @@ const Pulsar = require('../index.js');
subscription: 'sub1',
ackTimeoutMs: 10000,
nAckRedeliverTimeoutMs: 60000,
- })).rejects.toThrow('Topic or topicsPattern is required and must be specified as a string when creating consumer');
+ })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
});
test('Not String TopicsPattern', async () => {
@@ -49,7 +49,25 @@ const Pulsar = require('../index.js');
subscription: 'sub1',
ackTimeoutMs: 10000,
nAckRedeliverTimeoutMs: 60000,
- })).rejects.toThrow('Topic or topicsPattern is required and must be specified as a string when creating consumer');
+ })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
+ });
+
+ test('Not Array Topics', async () => {
+ await expect(client.subscribe({
+ topics: 0,
+ subscription: 'sub1',
+ ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
+ })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
+ });
+
+ test('Not String in Array Topics', async () => {
+ await expect(client.subscribe({
+ topics: [0, true],
+ subscription: 'sub1',
+ ackTimeoutMs: 10000,
+ nAckRedeliverTimeoutMs: 60000,
+ })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
});
test('No Subscription', async () => {
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 41cde1f..e7fb8f6 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -604,5 +604,66 @@ const Pulsar = require('../index.js');
await consumer.close();
await client.close();
});
+
+ test('Produce/Consume-Multi-Topic', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ expect(client).not.toBeNull();
+
+ const topic1 = 'persistent://public/default/produce-mtopic-1';
+ const topic2 = 'persistent://public/default/produce-mtopic-2';
+ const producer1 = await client.createProducer({
+ topic: topic1,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer1).not.toBeNull();
+ const producer2 = await client.createProducer({
+ topic: topic2,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer2).not.toBeNull();
+
+ const consumer = await client.subscribe({
+ topics: [topic1, topic2],
+ subscription: 'sub',
+ subscriptionType: 'Shared',
+ });
+ expect(consumer).not.toBeNull();
+
+ const messages = [];
+ for (let i = 0; i < 5; i += 1) {
+ const msg = `my-message-${i}`;
+ producer1.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer1.flush();
+ for (let i = 5; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer2.send({
+ data: Buffer.from(msg),
+ });
+ messages.push(msg);
+ }
+ await producer2.flush();
+
+ const results = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await consumer.receive();
+ results.push(msg.getData().toString());
+ consumer.acknowledge(msg);
+ }
+ expect(lodash.difference(messages, results)).toEqual([]);
+
+ await producer1.close();
+ await producer2.close();
+ await consumer.close();
+ await client.close();
+ });
});
})();