You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/04/11 14:16:35 UTC

[pulsar-client-node] branch branch-1.8 updated (769d7cc -> c5fccfc)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a change to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


    from 769d7cc  Release v1.8.2
     new c2a506c  [Fix] Fix message listener doesn't respect receiver queue size (#309)
     new c5fccfc  [Fix] Fix reader message listener doesn't respect receiver queue size (#316)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/Consumer.cc          |  31 ++++++++++---
 src/Reader.cc            |  32 ++++++++++---
 tests/end_to_end.test.js | 117 +++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 168 insertions(+), 12 deletions(-)


[pulsar-client-node] 02/02: [Fix] Fix reader message listener doesn't respect receiver queue size (#316)

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit c5fccfcc97f147f01fdc1c81fadf1dec8e2ed502
Author: Zike Yang <zi...@apache.org>
AuthorDate: Tue Apr 11 21:21:53 2023 +0800

    [Fix] Fix reader message listener doesn't respect receiver queue size (#316)
    
    (cherry picked from commit f38321aaf480638c2a99b4afa5f0c6d24dce4973)
---
 src/Reader.cc            | 32 +++++++++++++++++++++-----
 tests/end_to_end.test.js | 58 ++++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/src/Reader.cc b/src/Reader.cc
index 74bd4b2..5b9d8a7 100644
--- a/src/Reader.cc
+++ b/src/Reader.cc
@@ -26,6 +26,7 @@
 #include <pulsar/c/reader.h>
 #include <atomic>
 #include <thread>
+#include <future>
 
 Napi::FunctionReference Reader::constructor;
 
@@ -49,17 +50,30 @@ void Reader::Init(Napi::Env env, Napi::Object exports) {
 struct ReaderListenerProxyData {
   std::shared_ptr<pulsar_message_t> cMessage;
   Reader *reader;
+  std::function<void(void)> callback;
 
-  ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader)
-      : cMessage(cMessage), reader(reader) {}
+  ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader,
+                          std::function<void(void)> callback)
+      : cMessage(cMessage), reader(reader), callback(callback) {}
 };
 
 void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Reader *reader = data->reader;
-  delete data;
 
-  jsCallback.Call({msg, reader->Value()});
+  Napi::Value ret = jsCallback.Call({msg, reader->Value()});
+  if (ret.IsPromise()) {
+    Napi::Promise promise = ret.As<Napi::Promise>();
+    Napi::Value thenValue = promise.Get("then");
+    if (thenValue.IsFunction()) {
+      Napi::Function then = thenValue.As<Napi::Function>();
+      Napi::Function callback =
+          Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
+      then.Call(promise, {callback});
+      return;
+    }
+  }
+  data->callback();
 }
 
 void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, void *ctx) {
@@ -69,9 +83,15 @@ void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, vo
   if (readerListenerCallback->callback.Acquire() != napi_ok) {
     return;
   }
-  ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader);
-  readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy);
+
+  std::promise<void> promise;
+  std::future<void> future = promise.get_future();
+  std::unique_ptr<ReaderListenerProxyData> dataPtr(
+      new ReaderListenerProxyData(cMessage, reader, [&promise]() { promise.set_value(); }));
+  readerListenerCallback->callback.BlockingCall(dataPtr.get(), ReaderListenerProxy);
   readerListenerCallback->callback.Release();
+
+  future.wait();
 }
 
 void Reader::SetCReader(std::shared_ptr<pulsar_reader_t> cReader) { this->cReader = cReader; }
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 373ee47..7a5621d 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -310,9 +310,9 @@ const Pulsar = require('../index.js');
 
       let consumer2Recv = 0;
       while (true) {
-        await new Promise((resolve) => setTimeout(resolve, 10));
         try {
           const msg = await consumer2.receive(3000);
+          await new Promise((resolve) => setTimeout(resolve, 10));
           consumer2Recv += 1;
           await consumer2.acknowledge(msg);
         } catch (err) {
@@ -324,7 +324,7 @@ const Pulsar = require('../index.js');
       // the receiver queue size messages.
       // This way any of the consumers will not immediately empty all messages of a topic.
       expect(consumer1Recv).toBeGreaterThan(10);
-      expect(consumer1Recv).toBeGreaterThan(10);
+      expect(consumer2Recv).toBeGreaterThan(10);
 
       await consumer1.close();
       await consumer2.close();
@@ -332,6 +332,60 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Share readers with message listener', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'test-shared-reader-listener';
+      const producer = await client.createProducer({
+        topic,
+        batchingEnabled: false,
+      });
+
+      for (let i = 0; i < 100; i += 1) {
+        await producer.send(i);
+      }
+
+      let reader1Recv = 0;
+
+      const reader1 = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.earliest(),
+        receiverQueueSize: 10,
+        listener: async (message, reader) => {
+          await new Promise((resolve) => setTimeout(resolve, 10));
+          reader1Recv += 1;
+        },
+      });
+
+      const reader2 = await client.createReader({
+        topic,
+        startMessageId: Pulsar.MessageId.earliest(),
+        receiverQueueSize: 10,
+      });
+
+      let reader2Recv = 0;
+
+      while (reader2.hasNext()) {
+        await reader2.readNext();
+        await new Promise((resolve) => setTimeout(resolve, 10));
+        reader2Recv += 1;
+      }
+
+      // Ensure that each reader receives at least 1 times (greater than and not equal)
+      // the receiver queue size messages.
+      // This way any of the readers will not immediately empty all messages of a topic.
+      expect(reader1Recv).toBeGreaterThan(10);
+      expect(reader2Recv).toBeGreaterThan(10);
+
+      await reader1.close();
+      await reader2.close();
+      await producer.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',


[pulsar-client-node] 01/02: [Fix] Fix message listener doesn't respect receiver queue size (#309)

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit c2a506c223959a364e6464aa3a9254d59ec21646
Author: Zike Yang <zi...@apache.org>
AuthorDate: Mon Apr 10 21:01:02 2023 +0800

    [Fix] Fix message listener doesn't respect receiver queue size (#309)
    
    Fixes #308
    
    ### Motivation
    
    This bug is from the nodejs client but not the c++ client. The root cause is that the message listener calls the user callback in an asynchronous way. It will not wait for the result from the user callback, and then it will process the next messages immediately.
    
    ### Modifications
    
    * Use Napi::Promise to wait for the user callback to complete in the message listener.
    
    The message listener can detect whether the user function is asynchronous or synchronous.
    * If it is a synchronous function, it will behave the same as before. This is not an issue because `Napi::Function::Call` will wait for the synchronous function to complete.
    * If it is an asynchronous function, the user function must return a `Promise` object. We use `promise.then()` to determine when the user function is finished. The message listener will wait in the C++ client thread. We should not wait in the Node thread because it will block the entire Node main thread. This way, we can also utilize the feature of `MessageListenerThreads`. Previously, the configuration `MessageListenerThreads` did not work for the asynchronous callback. This PR also  [...]
    
    (cherry picked from commit c59f8801a1093831ef354e02e0692c4ce5b8dcc0)
---
 src/Consumer.cc          | 31 +++++++++++++++++++-----
 tests/end_to_end.test.js | 63 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 6 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index 91d7572..f5c3e04 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -25,6 +25,7 @@
 #include <pulsar/c/result.h>
 #include <atomic>
 #include <thread>
+#include <future>
 
 Napi::FunctionReference Consumer::constructor;
 
@@ -55,21 +56,34 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
 struct MessageListenerProxyData {
   std::shared_ptr<pulsar_message_t> cMessage;
   Consumer *consumer;
+  std::function<void(void)> callback;
 
-  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer)
-      : cMessage(cMessage), consumer(consumer) {}
+  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer,
+                           std::function<void(void)> callback)
+      : cMessage(cMessage), consumer(consumer), callback(callback) {}
 };
 
 void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Consumer *consumer = data->consumer;
-  delete data;
 
   // `consumer` might be null in certain cases, segmentation fault might happend without this null check. We
   // need to handle this rare case in future.
   if (consumer) {
-    jsCallback.Call({msg, consumer->Value()});
+    Napi::Value ret = jsCallback.Call({msg, consumer->Value()});
+    if (ret.IsPromise()) {
+      Napi::Promise promise = ret.As<Napi::Promise>();
+      Napi::Value thenValue = promise.Get("then");
+      if (thenValue.IsFunction()) {
+        Napi::Function then = thenValue.As<Napi::Function>();
+        Napi::Function callback =
+            Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
+        then.Call(promise, {callback});
+        return;
+      }
+    }
   }
+  data->callback();
 }
 
 void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessage, void *ctx) {
@@ -82,9 +96,14 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag
     return;
   }
 
-  MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
-  listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
+  std::promise<void> promise;
+  std::future<void> future = promise.get_future();
+  std::unique_ptr<MessageListenerProxyData> dataPtr(
+      new MessageListenerProxyData(cMessage, consumer, [&promise]() { promise.set_value(); }));
+  listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
   listenerCallback->callback.Release();
+
+  future.wait();
 }
 
 void Consumer::SetCConsumer(std::shared_ptr<pulsar_consumer_t> cConsumer) { this->cConsumer = cConsumer; }
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ba4c448..373ee47 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -269,6 +269,69 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Share consumers with message listener', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'test-shared-consumer-listener';
+      const producer = await client.createProducer({
+        topic,
+        batchingEnabled: false,
+      });
+
+      for (let i = 0; i < 100; i += 1) {
+        await producer.send(i);
+      }
+
+      let consumer1Recv = 0;
+
+      const consumer1 = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        receiverQueueSize: 10,
+        listener: async (message, messageConsumer) => {
+          await new Promise((resolve) => setTimeout(resolve, 10));
+          consumer1Recv += 1;
+          await messageConsumer.acknowledge(message);
+        },
+      });
+
+      const consumer2 = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        receiverQueueSize: 10,
+      });
+
+      let consumer2Recv = 0;
+      while (true) {
+        await new Promise((resolve) => setTimeout(resolve, 10));
+        try {
+          const msg = await consumer2.receive(3000);
+          consumer2Recv += 1;
+          await consumer2.acknowledge(msg);
+        } catch (err) {
+          break;
+        }
+      }
+
+      // Ensure that each consumer receives at least 1 times (greater than and not equal)
+      // the receiver queue size messages.
+      // This way any of the consumers will not immediately empty all messages of a topic.
+      expect(consumer1Recv).toBeGreaterThan(10);
+      expect(consumer1Recv).toBeGreaterThan(10);
+
+      await consumer1.close();
+      await consumer2.close();
+      await producer.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',