You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/04/11 14:16:37 UTC

[pulsar-client-node] 02/02: [Fix] Fix reader message listener doesn't respect receiver queue size (#316)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit c5fccfcc97f147f01fdc1c81fadf1dec8e2ed502
Author: Zike Yang <zi...@apache.org>
AuthorDate: Tue Apr 11 21:21:53 2023 +0800

    [Fix] Fix reader message listener doesn't respect receiver queue size (#316)
    
    (cherry picked from commit f38321aaf480638c2a99b4afa5f0c6d24dce4973)
---
 src/Reader.cc            | 32 +++++++++++++++++++++-----
 tests/end_to_end.test.js | 58 ++++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/src/Reader.cc b/src/Reader.cc
index 74bd4b2..5b9d8a7 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -26,6 +26,7 @@
 #include <pulsar/c/reader.h>
 #include <atomic>
 #include <thread>
+#include <future>
 
 Napi::FunctionReference Reader::constructor;
 
@@ -49,17 +50,30 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
 struct ReaderListenerProxyData {
   std::shared_ptr<pulsar_message_t> cMessage;
   Reader *reader;
+  std::function<void(void)> callback;
 
-  ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader)
-      : cMessage(cMessage), reader(reader) {}
+  ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader,
+                          std::function<void(void)> callback)
+      : cMessage(cMessage), reader(reader), callback(callback) {}
 };
 
 void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Reader *reader = data->reader;
-  delete data;
 
-  jsCallback.Call({msg, reader->Value()});
+  Napi::Value ret = jsCallback.Call({msg, reader->Value()});
+  if (ret.IsPromise()) {
+    Napi::Promise promise = ret.As<Napi::Promise>();
+    Napi::Value thenValue = promise.Get("then");
+    if (thenValue.IsFunction()) {
+      Napi::Function then = thenValue.As<Napi::Function>();
+      Napi::Function callback =
+          Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
+      then.Call(promise, {callback});
+      return;
+    }
+  }
+  data->callback();
 }
 
 void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, void *ctx) {
@@ -69,9 +83,15 @@ void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, vo
   if (readerListenerCallback->callback.Acquire() != napi_ok) {
     return;
   }
-  ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader);
-  readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy);
+
+  std::promise<void> promise;
+  std::future<void> future = promise.get_future();
+  std::unique_ptr<ReaderListenerProxyData> dataPtr(
+      new ReaderListenerProxyData(cMessage, reader, [&promise]() { promise.set_value(); }));
+  readerListenerCallback->callback.BlockingCall(dataPtr.get(), ReaderListenerProxy);
   readerListenerCallback->callback.Release();
+
+  future.wait();
 }
 
 void Reader::SetCReader(std::shared_ptr<pulsar_reader_t> cReader) { this->cReader = cReader; }
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 373ee47..7a5621d 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -310,9 +310,9 @@ const Pulsar = require('../index.js');
 
       let consumer2Recv = 0;
       while (true) {
-        await new Promise((resolve) => setTimeout(resolve, 10));
         try {
           const msg = await consumer2.receive(3000);
+          await new Promise((resolve) => setTimeout(resolve, 10));
           consumer2Recv += 1;
           await consumer2.acknowledge(msg);
         } catch (err) {
@@ -324,7 +324,7 @@ const Pulsar = require('../index.js');
       // the receiver queue size messages.
       // This way any of the consumers will not immediately empty all messages of a topic.
       expect(consumer1Recv).toBeGreaterThan(10);
-      expect(consumer1Recv).toBeGreaterThan(10);
+      expect(consumer2Recv).toBeGreaterThan(10);
 
       await consumer1.close();
       await consumer2.close();
@@ -332,6 +332,60 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Share readers with message listener', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'test-shared-reader-listener';
+      const producer = await client.createProducer({
+        topic,
+        batchingEnabled: false,
+      });
+
+      for (let i = 0; i < 100; i += 1) {
+        await producer.send(i);
+      }
+
+      let reader1Recv = 0;
+
+      const reader1 = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.earliest(),
+        receiverQueueSize: 10,
+        listener: async (message, reader) => {
+          await new Promise((resolve) => setTimeout(resolve, 10));
+          reader1Recv += 1;
+        },
+      });
+
+      const reader2 = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.earliest(),
+        receiverQueueSize: 10,
+      });
+
+      let reader2Recv = 0;
+
+      while (reader2.hasNext()) {
+        await reader2.readNext();
+        await new Promise((resolve) => setTimeout(resolve, 10));
+        reader2Recv += 1;
+      }
+
+      // Ensure that each reader receives at least 1 times (greater than and not equal)
+      // the receiver queue size messages.
+      // This way any of the readers will not immediately empty all messages of a topic.
+      expect(reader1Recv).toBeGreaterThan(10);
+      expect(reader2Recv).toBeGreaterThan(10);
+
+      await reader1.close();
+      await reader2.close();
+      await producer.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',