You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/05/09 07:58:58 UTC
[pulsar-client-node] branch master updated: [feat]: Support client.getPartitionsForTopic (#320) (#322)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new a31b29a [feat]: Support client.getPartitionsForTopic (#320) (#322)
a31b29a is described below
commit a31b29a1a17763de1939a6e21a6dd86c2c16c9f3
Author: Keith <mr...@outlook.com>
AuthorDate: Tue May 9 15:58:52 2023 +0800
[feat]: Support client.getPartitionsForTopic (#320) (#322)
* [feat]: Support client.getPartitionsForTopic (#320)
* [feat]: Support client.getPartitionsForTopic, fix ut
---
index.d.ts | 1 +
src/Client.cc | 36 ++++++++++++++++++++++
src/Client.h | 1 +
src/Client.js | 4 +++
tests/client.test.js | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 127 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index d1990e4..ca1e489 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -39,6 +39,7 @@ export class Client {
createProducer(config: ProducerConfig): Promise<Producer>;
subscribe(config: ConsumerConfig): Promise<Consumer>;
createReader(config: ReaderConfig): Promise<Reader>;
+ getPartitionsForTopic(topic: string): Promise<string[]>;
close(): Promise<null>;
}
diff --git a/src/Client.cc b/src/Client.cc
index 94b84a5..f557a08 100644
--- a/src/Client.cc
+++ b/src/Client.cc
@@ -79,6 +79,7 @@ Napi::Object Client::Init(Napi::Env env, Napi::Object exports) {
{StaticMethod("setLogHandler", &Client::SetLogHandler),
InstanceMethod("createProducer", &Client::CreateProducer),
InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("createReader", &Client::CreateReader),
+ InstanceMethod("getPartitionsForTopic", &Client::GetPartitionsForTopic),
InstanceMethod("close", &Client::Close)});
constructor = Napi::Persistent(func);
@@ -206,6 +207,41 @@ Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) {
return Reader::NewInstance(info, this->cClient);
}
+Napi::Value Client::GetPartitionsForTopic(const Napi::CallbackInfo &info) {
+ Napi::String topicString = info[0].As<Napi::String>();
+ std::string topic = topicString.Utf8Value();
+ auto deferred = ThreadSafeDeferred::New(Env());
+ auto ctx = new ExtDeferredContext(deferred);
+
+ pulsar_client_get_topic_partitions_async(
+ this->cClient.get(), topic.c_str(),
+ [](pulsar_result result, pulsar_string_list_t *topicList, void *ctx) {
+ auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+ auto deferred = deferredContext->deferred;
+ delete deferredContext;
+
+ if (result == pulsar_result_Ok && topicList != nullptr) {
+ deferred->Resolve([topicList](const Napi::Env env) {
+ int listSize = pulsar_string_list_size(topicList);
+ Napi::Array jsArray = Napi::Array::New(env, listSize);
+
+ for (int i = 0; i < listSize; i++) {
+ const char *str = pulsar_string_list_get(topicList, i);
+ jsArray.Set(i, Napi::String::New(env, str));
+ }
+
+ return jsArray;
+ });
+ } else {
+ deferred->Reject(std::string("Failed to GetPartitionsForTopic: ") + pulsar_result_str(result));
+ }
+ },
+
+ ctx);
+
+ return deferred->Promise();
+}
+
void LogMessageProxy(Napi::Env env, Napi::Function jsCallback, struct LogMessage *logMessage) {
Napi::Number logLevel = Napi::Number::New(env, static_cast<double>(logMessage->level));
Napi::String file = Napi::String::New(env, logMessage->file);
diff --git a/src/Client.h b/src/Client.h
index 3755b6c..8d2ba03 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -57,6 +57,7 @@ class Client : public Napi::ObjectWrap<Client> {
Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Napi::Value Subscribe(const Napi::CallbackInfo &info);
Napi::Value CreateReader(const Napi::CallbackInfo &info);
+ Napi::Value GetPartitionsForTopic(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
};
diff --git a/src/Client.js b/src/Client.js
index 5932e13..2ddbfc1 100644
--- a/src/Client.js
+++ b/src/Client.js
@@ -44,6 +44,10 @@ class Client {
return this.client.createReader(params);
}
+ getPartitionsForTopic(params) {
+ return this.client.getPartitionsForTopic(params);
+ }
+
close() {
this.client.close();
}
diff --git a/tests/client.test.js b/tests/client.test.js
index 03e46d1..30ce440 100644
--- a/tests/client.test.js
+++ b/tests/client.test.js
@@ -17,8 +17,33 @@
* under the License.
*/
+const http = require('http');
const Pulsar = require('../index.js');
+const baseUrl = 'http://localhost:8080';
+const requestAdminApi = (url, { headers, data = {}, method = 'PUT' }) => new Promise((resolve, reject) => {
+ const req = http.request(url, {
+ headers,
+ method,
+ }, (res) => {
+ let responseBody = '';
+ res.on('data', (chunk) => {
+ responseBody += chunk;
+ });
+ res.on('end', () => {
+ resolve({ responseBody, statusCode: res.statusCode });
+ });
+ });
+
+ req.on('error', (error) => {
+ reject(error);
+ });
+
+ req.write(JSON.stringify(data));
+
+ req.end();
+});
+
(() => {
describe('Client', () => {
describe('CreateFailedByUrlSetIncorrect', () => {
@@ -49,5 +74,65 @@ const Pulsar = require('../index.js');
})).toThrow('Service URL is required and must be specified as a string');
});
});
+ describe('test getPartitionsForTopic', () => {
+ test('GetPartitions for empty topic', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ await expect(client.getPartitionsForTopic(''))
+ .rejects.toThrow('Failed to GetPartitionsForTopic: InvalidTopicName');
+ await client.close();
+ });
+
+ test('Client/getPartitionsForTopic', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ // test on nonPartitionedTopic
+ const nonPartitionedTopicName = 'test-non-partitioned-topic';
+ const nonPartitionedTopic = `persistent://public/default/${nonPartitionedTopicName}`;
+ const nonPartitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`;
+ const createNonPartitionedTopicRes = await requestAdminApi(nonPartitionedTopicAdminURL, {
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ });
+ expect(createNonPartitionedTopicRes.statusCode).toBe(204);
+
+ const nonPartitionedTopicList = await client.getPartitionsForTopic(nonPartitionedTopic);
+ expect(nonPartitionedTopicList).toEqual([nonPartitionedTopic]);
+
+ // test on partitioned with number
+ const partitionedTopicName = 'test-partitioned-topic-1';
+ const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
+ const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
+ const createPartitionedTopicRes = await requestAdminApi(partitionedTopicAdminURL, {
+ headers: {
+ 'Content-Type': 'text/plain',
+ },
+ data: 4,
+ });
+ expect(createPartitionedTopicRes.statusCode).toBe(204);
+
+ const partitionedTopicList = await client.getPartitionsForTopic(partitionedTopic);
+ expect(partitionedTopicList).toEqual([
+ 'persistent://public/default/test-partitioned-topic-1-partition-0',
+ 'persistent://public/default/test-partitioned-topic-1-partition-1',
+ 'persistent://public/default/test-partitioned-topic-1-partition-2',
+ 'persistent://public/default/test-partitioned-topic-1-partition-3',
+ ]);
+
+ const deleteNonPartitionedTopicRes = await requestAdminApi(nonPartitionedTopicAdminURL, { method: 'DELETE' });
+ expect(deleteNonPartitionedTopicRes.statusCode).toBe(204);
+ const deletePartitionedTopicRes = await requestAdminApi(partitionedTopicAdminURL, { method: 'DELETE' });
+ expect(deletePartitionedTopicRes.statusCode).toBe(204);
+
+ await client.close();
+ });
+ });
});
})();