You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/04/11 14:16:37 UTC
[pulsar-client-node] 02/02: [Fix] Fix reader message listener doesn't respect receiver queue size (#316)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit c5fccfcc97f147f01fdc1c81fadf1dec8e2ed502
Author: Zike Yang <zi...@apache.org>
AuthorDate: Tue Apr 11 21:21:53 2023 +0800
[Fix] Fix reader message listener doesn't respect receiver queue size (#316)
(cherry picked from commit f38321aaf480638c2a99b4afa5f0c6d24dce4973)
---
src/Reader.cc | 32 +++++++++++++++++++++-----
tests/end_to_end.test.js | 58 ++++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 82 insertions(+), 8 deletions(-)
diff --git a/src/Reader.cc b/src/Reader.cc
index 74bd4b2..5b9d8a7 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -26,6 +26,7 @@
#include <pulsar/c/reader.h>
#include <atomic>
#include <thread>
+#include <future>
Napi::FunctionReference Reader::constructor;
@@ -49,17 +50,30 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
struct ReaderListenerProxyData {
std::shared_ptr<pulsar_message_t> cMessage;
Reader *reader;
+ std::function<void(void)> callback;
- ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader)
- : cMessage(cMessage), reader(reader) {}
+ ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader,
+ std::function<void(void)> callback)
+ : cMessage(cMessage), reader(reader), callback(callback) {}
};
void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Reader *reader = data->reader;
- delete data;
- jsCallback.Call({msg, reader->Value()});
+ Napi::Value ret = jsCallback.Call({msg, reader->Value()});
+ if (ret.IsPromise()) {
+ Napi::Promise promise = ret.As<Napi::Promise>();
+ Napi::Value thenValue = promise.Get("then");
+ if (thenValue.IsFunction()) {
+ Napi::Function then = thenValue.As<Napi::Function>();
+ Napi::Function callback =
+ Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
+ then.Call(promise, {callback});
+ return;
+ }
+ }
+ data->callback();
}
void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, void *ctx) {
@@ -69,9 +83,15 @@ void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, vo
if (readerListenerCallback->callback.Acquire() != napi_ok) {
return;
}
- ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader);
- readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy);
+
+ std::promise<void> promise;
+ std::future<void> future = promise.get_future();
+ std::unique_ptr<ReaderListenerProxyData> dataPtr(
+ new ReaderListenerProxyData(cMessage, reader, [&promise]() { promise.set_value(); }));
+ readerListenerCallback->callback.BlockingCall(dataPtr.get(), ReaderListenerProxy);
readerListenerCallback->callback.Release();
+
+ future.wait();
}
void Reader::SetCReader(std::shared_ptr<pulsar_reader_t> cReader) { this->cReader = cReader; }
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 373ee47..7a5621d 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -310,9 +310,9 @@ const Pulsar = require('../index.js');
let consumer2Recv = 0;
while (true) {
- await new Promise((resolve) => setTimeout(resolve, 10));
try {
const msg = await consumer2.receive(3000);
+ await new Promise((resolve) => setTimeout(resolve, 10));
consumer2Recv += 1;
await consumer2.acknowledge(msg);
} catch (err) {
@@ -324,7 +324,7 @@ const Pulsar = require('../index.js');
// the receiver queue size messages.
// This way any of the consumers will not immediately empty all messages of a topic.
expect(consumer1Recv).toBeGreaterThan(10);
- expect(consumer1Recv).toBeGreaterThan(10);
+ expect(consumer2Recv).toBeGreaterThan(10);
await consumer1.close();
await consumer2.close();
@@ -332,6 +332,60 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('Share readers with message listener', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'test-shared-reader-listener';
+ const producer = await client.createProducer({
+ topic,
+ batchingEnabled: false,
+ });
+
+ for (let i = 0; i < 100; i += 1) {
+ await producer.send(i);
+ }
+
+ let reader1Recv = 0;
+
+ const reader1 = await client.createReader({
+ topic,
+ startMessageId: Pulsar.MessageId.earliest(),
+ receiverQueueSize: 10,
+ listener: async (message, reader) => {
+ await new Promise((resolve) => setTimeout(resolve, 10));
+ reader1Recv += 1;
+ },
+ });
+
+ const reader2 = await client.createReader({
+ topic,
+ startMessageId: Pulsar.MessageId.earliest(),
+ receiverQueueSize: 10,
+ });
+
+ let reader2Recv = 0;
+
+ while (reader2.hasNext()) {
+ await reader2.readNext();
+ await new Promise((resolve) => setTimeout(resolve, 10));
+ reader2Recv += 1;
+ }
+
+ // Ensure that each reader receives at least 1 times (greater than and not equal)
+ // the receiver queue size messages.
+ // This way any of the readers will not immediately empty all messages of a topic.
+ expect(reader1Recv).toBeGreaterThan(10);
+ expect(reader2Recv).toBeGreaterThan(10);
+
+ await reader1.close();
+ await reader2.close();
+ await producer.close();
+ await client.close();
+ });
+
test('acknowledgeCumulative', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',