You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2020/10/19 02:29:16 UTC

[pulsar-client-node] branch master updated: Add Multi-Topic Support to Consumer (#130)

This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new f010fb2  Add Multi-Topic Support to Consumer (#130)
f010fb2 is described below

commit f010fb2ed3c530af8006ba3d3a7b73c6b46f6507
Author: savearray2 <46...@users.noreply.github.com>
AuthorDate: Mon Oct 19 11:29:06 2020 +0900

    Add Multi-Topic Support to Consumer (#130)
    
    * Add Multi-Topic Support to Consumer
    
    Adds multi-topic support from the C++ library by utilizing pulsar_client_subscribe_multi_topics_async.
    
    * Update c_topics to cTopics
    
    Change from snake_case to camelCase
    
    Co-authored-by: savearray2 <savearray2>
---
 src/Consumer.cc          | 19 +++++++++++----
 src/ConsumerConfig.cc    | 11 +++++++++
 src/ConsumerConfig.h     |  2 ++
 tests/consumer.test.js   | 24 ++++++++++++++++---
 tests/end_to_end.test.js | 61 ++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index 986fb9d..8e4a9e1 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -106,10 +106,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
   ~ConsumerNewInstanceWorker() {}
   void Execute() {
     const std::string &topic = this->consumerConfig->GetTopic();
+    const std::vector<std::string> &topics = this->consumerConfig->GetTopics();
     const std::string &topicsPattern = this->consumerConfig->GetTopicsPattern();
-    if (topic.empty() && topicsPattern.empty()) {
-      SetError(std::string(
-          "Topic or topicsPattern is required and must be specified as a string when creating consumer"));
+    if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
+      SetError(
+          std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
+                      "creating consumer"));
       return;
     }
     const std::string &subscription = this->consumerConfig->GetSubscription();
@@ -133,10 +135,19 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
     }
 
     this->done = false;
-    if (topic.empty()) {
+    if (!topicsPattern.empty()) {
       pulsar_client_subscribe_pattern_async(this->cClient, topicsPattern.c_str(), subscription.c_str(),
                                             this->consumerConfig->GetCConsumerConfig(),
                                             &ConsumerNewInstanceWorker::subscribeCallback, (void *)this);
+    } else if (topics.size() > 0) {
+      const char **cTopics = new const char *[topics.size()];
+      for (size_t i = 0; i < topics.size(); i++) {
+        cTopics[i] = topics[i].c_str();
+      }
+      pulsar_client_subscribe_multi_topics_async(this->cClient, cTopics, topics.size(), subscription.c_str(),
+                                                 this->consumerConfig->GetCConsumerConfig(),
+                                                 &ConsumerNewInstanceWorker::subscribeCallback, (void *)this);
+      delete cTopics;
     } else {
       pulsar_client_subscribe_async(this->cClient, topic.c_str(), subscription.c_str(),
                                     this->consumerConfig->GetCConsumerConfig(),
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index aff3736..939986d 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -25,6 +25,7 @@
 #include <map>
 
 static const std::string CFG_TOPIC = "topic";
+static const std::string CFG_TOPICS = "topics";
 static const std::string CFG_TOPICS_PATTERN = "topicsPattern";
 static const std::string CFG_SUBSCRIPTION = "subscription";
 static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
@@ -64,6 +65,15 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
     this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
   }
 
+  if (consumerConfig.Has(CFG_TOPICS) && consumerConfig.Get(CFG_TOPICS).IsArray()) {
+    auto arr = consumerConfig.Get(CFG_TOPICS).As<Napi::Array>();
+    for (uint32_t i = 0; i < arr.Length(); i++) {
+      if (arr.Get(i).IsString()) {
+        this->topics.emplace_back(arr.Get(i).ToString().Utf8Value());
+      }
+    }
+  }
+
   if (consumerConfig.Has(CFG_TOPICS_PATTERN) && consumerConfig.Get(CFG_TOPICS_PATTERN).IsString()) {
     this->topicsPattern = consumerConfig.Get(CFG_TOPICS_PATTERN).ToString().Utf8Value();
   }
@@ -165,6 +175,7 @@ ConsumerConfig::~ConsumerConfig() {
 pulsar_consumer_configuration_t *ConsumerConfig::GetCConsumerConfig() { return this->cConsumerConfig; }
 
 std::string ConsumerConfig::GetTopic() { return this->topic; }
+std::vector<std::string> ConsumerConfig::GetTopics() { return this->topics; }
 std::string ConsumerConfig::GetTopicsPattern() { return this->topicsPattern; }
 std::string ConsumerConfig::GetSubscription() { return this->subscription; }
 ListenerCallback *ConsumerConfig::GetListenerCallback() {
diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h
index d3ef01f..7a434fe 100644
--- a/src/ConsumerConfig.h
+++ b/src/ConsumerConfig.h
@@ -32,6 +32,7 @@ class ConsumerConfig {
   ~ConsumerConfig();
   pulsar_consumer_configuration_t *GetCConsumerConfig();
   std::string GetTopic();
+  std::vector<std::string> GetTopics();
   std::string GetTopicsPattern();
   std::string GetSubscription();
   int64_t GetAckTimeoutMs();
@@ -42,6 +43,7 @@ class ConsumerConfig {
  private:
   pulsar_consumer_configuration_t *cConsumerConfig;
   std::string topic;
+  std::vector<std::string> topics;
   std::string topicsPattern;
   std::string subscription;
   int64_t ackTimeoutMs;
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index f4fede5..c152989 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -31,7 +31,7 @@ const Pulsar = require('../index.js');
           subscription: 'sub1',
           ackTimeoutMs: 10000,
           nAckRedeliverTimeoutMs: 60000,
-        })).rejects.toThrow('Topic or topicsPattern is required and must be specified as a string when creating consumer');
+        })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
       });
 
       test('Not String Topic', async () => {
@@ -40,7 +40,7 @@ const Pulsar = require('../index.js');
           subscription: 'sub1',
           ackTimeoutMs: 10000,
           nAckRedeliverTimeoutMs: 60000,
-        })).rejects.toThrow('Topic or topicsPattern is required and must be specified as a string when creating consumer');
+        })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
       });
 
       test('Not String TopicsPattern', async () => {
@@ -49,7 +49,25 @@ const Pulsar = require('../index.js');
           subscription: 'sub1',
           ackTimeoutMs: 10000,
           nAckRedeliverTimeoutMs: 60000,
-        })).rejects.toThrow('Topic or topicsPattern is required and must be specified as a string when creating consumer');
+        })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
+      });
+
+      test('Not Array Topics', async () => {
+        await expect(client.subscribe({
+          topics: 0,
+          subscription: 'sub1',
+          ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
+        })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
+      });
+
+      test('Not String in Array Topics', async () => {
+        await expect(client.subscribe({
+          topics: [0, true],
+          subscription: 'sub1',
+          ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
+        })).rejects.toThrow('Topic, topics or topicsPattern is required and must be specified as a string when creating consumer');
       });
 
       test('No Subscription', async () => {
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 41cde1f..e7fb8f6 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -604,5 +604,66 @@ const Pulsar = require('../index.js');
       await consumer.close();
       await client.close();
     });
+
+    test('Produce/Consume-Multi-Topic', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+      expect(client).not.toBeNull();
+
+      const topic1 = 'persistent://public/default/produce-mtopic-1';
+      const topic2 = 'persistent://public/default/produce-mtopic-2';
+      const producer1 = await client.createProducer({
+        topic: topic1,
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer1).not.toBeNull();
+      const producer2 = await client.createProducer({
+        topic: topic2,
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer2).not.toBeNull();
+
+      const consumer = await client.subscribe({
+        topics: [topic1, topic2],
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+      });
+      expect(consumer).not.toBeNull();
+
+      const messages = [];
+      for (let i = 0; i < 5; i += 1) {
+        const msg = `my-message-${i}`;
+        producer1.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer1.flush();
+      for (let i = 5; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        producer2.send({
+          data: Buffer.from(msg),
+        });
+        messages.push(msg);
+      }
+      await producer2.flush();
+
+      const results = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = await consumer.receive();
+        results.push(msg.getData().toString());
+        consumer.acknowledge(msg);
+      }
+      expect(lodash.difference(messages, results)).toEqual([]);
+
+      await producer1.close();
+      await producer2.close();
+      await consumer.close();
+      await client.close();
+    });
   });
 })();