You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/05/09 07:58:58 UTC

[pulsar-client-node] branch master updated: [feat]: Support client.getPartitionsForTopic (#320) (#322)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new a31b29a  [feat]: Support client.getPartitionsForTopic (#320) (#322)
a31b29a is described below

commit a31b29a1a17763de1939a6e21a6dd86c2c16c9f3
Author: Keith <mr...@outlook.com>
AuthorDate: Tue May 9 15:58:52 2023 +0800

    [feat]: Support client.getPartitionsForTopic (#320) (#322)
    
    * [feat]: Support client.getPartitionsForTopic (#320)
    
    * [feat]: Support client.getPartitionsForTopic, fix ut
---
 index.d.ts           |  1 +
 src/Client.cc        | 36 ++++++++++++++++++++++
 src/Client.h         |  1 +
 src/Client.js        |  4 +++
 tests/client.test.js | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 127 insertions(+)

diff --git a/index.d.ts b/index.d.ts
index d1990e4..ca1e489 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -39,6 +39,7 @@ export class Client {
   createProducer(config: ProducerConfig): Promise<Producer>;
   subscribe(config: ConsumerConfig): Promise<Consumer>;
   createReader(config: ReaderConfig): Promise<Reader>;
+  getPartitionsForTopic(topic: string): Promise<string[]>;
   close(): Promise<null>;
 }
 
diff --git a/src/Client.cc b/src/Client.cc
index 94b84a5..f557a08 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -79,6 +79,7 @@ Napi::Object Client::Init(Napi::Env env, Napi::Object exports) {
       {StaticMethod("setLogHandler", &Client::SetLogHandler),
        InstanceMethod("createProducer", &Client::CreateProducer),
        InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("createReader", &Client::CreateReader),
+       InstanceMethod("getPartitionsForTopic", &Client::GetPartitionsForTopic),
        InstanceMethod("close", &Client::Close)});
 
   constructor = Napi::Persistent(func);
@@ -206,6 +207,41 @@ Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) {
   return Reader::NewInstance(info, this->cClient);
 }
 
+Napi::Value Client::GetPartitionsForTopic(const Napi::CallbackInfo &info) {
+  Napi::String topicString = info[0].As<Napi::String>();
+  std::string topic = topicString.Utf8Value();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext(deferred);
+
+  pulsar_client_get_topic_partitions_async(
+      this->cClient.get(), topic.c_str(),
+      [](pulsar_result result, pulsar_string_list_t *topicList, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+        auto deferred = deferredContext->deferred;
+        delete deferredContext;
+
+        if (result == pulsar_result_Ok && topicList != nullptr) {
+          deferred->Resolve([topicList](const Napi::Env env) {
+            int listSize = pulsar_string_list_size(topicList);
+            Napi::Array jsArray = Napi::Array::New(env, listSize);
+
+            for (int i = 0; i < listSize; i++) {
+              const char *str = pulsar_string_list_get(topicList, i);
+              jsArray.Set(i, Napi::String::New(env, str));
+            }
+
+            return jsArray;
+          });
+        } else {
+          deferred->Reject(std::string("Failed to GetPartitionsForTopic: ") + pulsar_result_str(result));
+        }
+      },
+
+      ctx);
+
+  return deferred->Promise();
+}
+
 void LogMessageProxy(Napi::Env env, Napi::Function jsCallback, struct LogMessage *logMessage) {
   Napi::Number logLevel = Napi::Number::New(env, static_cast<double>(logMessage->level));
   Napi::String file = Napi::String::New(env, logMessage->file);
diff --git a/src/Client.h b/src/Client.h
index 3755b6c..8d2ba03 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -57,6 +57,7 @@ class Client : public Napi::ObjectWrap<Client> {
   Napi::Value CreateProducer(const Napi::CallbackInfo &info);
   Napi::Value Subscribe(const Napi::CallbackInfo &info);
   Napi::Value CreateReader(const Napi::CallbackInfo &info);
+  Napi::Value GetPartitionsForTopic(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
 };
 
diff --git a/src/Client.js b/src/Client.js
index 5932e13..2ddbfc1 100644
--- a/src/Client.js
+++ b/src/Client.js
@@ -44,6 +44,10 @@ class Client {
     return this.client.createReader(params);
   }
 
+  getPartitionsForTopic(params) {
+    return this.client.getPartitionsForTopic(params);
+  }
+
   close() {
     this.client.close();
   }
diff --git a/tests/client.test.js b/tests/client.test.js
index 03e46d1..30ce440 100644
--- a/tests/client.test.js
+++ b/tests/client.test.js
@@ -17,8 +17,33 @@
  * under the License.
  */
 
+const http = require('http');
 const Pulsar = require('../index.js');
 
+const baseUrl = 'http://localhost:8080';
+const requestAdminApi = (url, { headers, data = {}, method = 'PUT' }) => new Promise((resolve, reject) => {
+  const req = http.request(url, {
+    headers,
+    method,
+  }, (res) => {
+    let responseBody = '';
+    res.on('data', (chunk) => {
+      responseBody += chunk;
+    });
+    res.on('end', () => {
+      resolve({ responseBody, statusCode: res.statusCode });
+    });
+  });
+
+  req.on('error', (error) => {
+    reject(error);
+  });
+
+  req.write(JSON.stringify(data));
+
+  req.end();
+});
+
 (() => {
   describe('Client', () => {
     describe('CreateFailedByUrlSetIncorrect', () => {
@@ -49,5 +74,65 @@ const Pulsar = require('../index.js');
         })).toThrow('Service URL is required and must be specified as a string');
       });
     });
+    describe('test getPartitionsForTopic', () => {
+      test('GetPartitions for empty topic', async () => {
+        const client = new Pulsar.Client({
+          serviceUrl: 'pulsar://localhost:6650',
+          operationTimeoutSeconds: 30,
+        });
+
+        await expect(client.getPartitionsForTopic(''))
+          .rejects.toThrow('Failed to GetPartitionsForTopic: InvalidTopicName');
+        await client.close();
+      });
+
+      test('Client/getPartitionsForTopic', async () => {
+        const client = new Pulsar.Client({
+          serviceUrl: 'pulsar://localhost:6650',
+          operationTimeoutSeconds: 30,
+        });
+
+        // test on nonPartitionedTopic
+        const nonPartitionedTopicName = 'test-non-partitioned-topic';
+        const nonPartitionedTopic = `persistent://public/default/${nonPartitionedTopicName}`;
+        const nonPartitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`;
+        const createNonPartitionedTopicRes = await requestAdminApi(nonPartitionedTopicAdminURL, {
+          headers: {
+            'Content-Type': 'application/json',
+          },
+        });
+        expect(createNonPartitionedTopicRes.statusCode).toBe(204);
+
+        const nonPartitionedTopicList = await client.getPartitionsForTopic(nonPartitionedTopic);
+        expect(nonPartitionedTopicList).toEqual([nonPartitionedTopic]);
+
+        // test on partitioned with number
+        const partitionedTopicName = 'test-partitioned-topic-1';
+        const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
+        const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
+        const createPartitionedTopicRes = await requestAdminApi(partitionedTopicAdminURL, {
+          headers: {
+            'Content-Type': 'text/plain',
+          },
+          data: 4,
+        });
+        expect(createPartitionedTopicRes.statusCode).toBe(204);
+
+        const partitionedTopicList = await client.getPartitionsForTopic(partitionedTopic);
+        expect(partitionedTopicList).toEqual([
+          'persistent://public/default/test-partitioned-topic-1-partition-0',
+          'persistent://public/default/test-partitioned-topic-1-partition-1',
+          'persistent://public/default/test-partitioned-topic-1-partition-2',
+          'persistent://public/default/test-partitioned-topic-1-partition-3',
+        ]);
+
+        const deleteNonPartitionedTopicRes = await requestAdminApi(nonPartitionedTopicAdminURL, { method: 'DELETE' });
+        expect(deleteNonPartitionedTopicRes.statusCode).toBe(204);
+        const deletePartitionedTopicRes = await requestAdminApi(partitionedTopicAdminURL, { method: 'DELETE' });
+        expect(deletePartitionedTopicRes.statusCode).toBe(204);
+
+        await client.close();
+      });
+    });
   });
 })();