You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/04/11 14:16:36 UTC

[pulsar-client-node] 01/02: [Fix] Fix message listener doesn't respect receiver queue size (#309)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit c2a506c223959a364e6464aa3a9254d59ec21646
Author: Zike Yang <zi...@apache.org>
AuthorDate: Mon Apr 10 21:01:02 2023 +0800

    [Fix] Fix message listener doesn't respect receiver queue size (#309)
    
    Fixes #308
    
    ### Motivation
    
    This bug is from the nodejs client but not the c++ client. The root cause is that the message listener calls the user callback in an asynchronous way. It will not wait for the result from the user callback, and then it will process the next messages immediately.
    
    ### Modifications
    
    * Use Napi::Promise to wait for the user callback to complete in the message listener.
    
    The message listener can detect whether the user function is asynchronous or synchronous.
    * If it is a synchronous function, it will behave the same as before. This is not an issue because `Napi::Function::Call` will wait for the synchronous function to complete.
    * If it is an asynchronous function, the user function must return a `Promise` object. We use `promise.then()` to determine when the user function is finished. The message listener will wait in the C++ client thread. We should not wait in the Node thread because it will block the entire Node main thread. This way, we can also utilize the feature of `MessageListenerThreads`. Previously, the configuration `MessageListenerThreads` did not work for the asynchronous callback. This PR also  [...]
    
    (cherry picked from commit c59f8801a1093831ef354e02e0692c4ce5b8dcc0)
---
 src/Consumer.cc          | 31 +++++++++++++++++++-----
 tests/end_to_end.test.js | 63 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 6 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index 91d7572..f5c3e04 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -25,6 +25,7 @@
 #include <pulsar/c/result.h>
 #include <atomic>
 #include <thread>
+#include <future>
 
 Napi::FunctionReference Consumer::constructor;
 
@@ -55,21 +56,34 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
 struct MessageListenerProxyData {
   std::shared_ptr<pulsar_message_t> cMessage;
   Consumer *consumer;
+  std::function<void(void)> callback;
 
-  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer)
-      : cMessage(cMessage), consumer(consumer) {}
+  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer,
+                           std::function<void(void)> callback)
+      : cMessage(cMessage), consumer(consumer), callback(callback) {}
 };
 
 void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Consumer *consumer = data->consumer;
-  delete data;
 
   // `consumer` might be null in certain cases, segmentation fault might happend without this null check. We
   // need to handle this rare case in future.
   if (consumer) {
-    jsCallback.Call({msg, consumer->Value()});
+    Napi::Value ret = jsCallback.Call({msg, consumer->Value()});
+    if (ret.IsPromise()) {
+      Napi::Promise promise = ret.As<Napi::Promise>();
+      Napi::Value thenValue = promise.Get("then");
+      if (thenValue.IsFunction()) {
+        Napi::Function then = thenValue.As<Napi::Function>();
+        Napi::Function callback =
+            Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
+        then.Call(promise, {callback});
+        return;
+      }
+    }
   }
+  data->callback();
 }
 
 void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessage, void *ctx) {
@@ -82,9 +96,14 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag
     return;
   }
 
-  MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
-  listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
+  std::promise<void> promise;
+  std::future<void> future = promise.get_future();
+  std::unique_ptr<MessageListenerProxyData> dataPtr(
+      new MessageListenerProxyData(cMessage, consumer, [&promise]() { promise.set_value(); }));
+  listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
   listenerCallback->callback.Release();
+
+  future.wait();
 }
 
 void Consumer::SetCConsumer(std::shared_ptr<pulsar_consumer_t> cConsumer) { this->cConsumer = cConsumer; }
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ba4c448..373ee47 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -269,6 +269,69 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Share consumers with message listener', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'test-shared-consumer-listener';
+      const producer = await client.createProducer({
+        topic,
+        batchingEnabled: false,
+      });
+
+      for (let i = 0; i < 100; i += 1) {
+        await producer.send(i);
+      }
+
+      let consumer1Recv = 0;
+
+      const consumer1 = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        receiverQueueSize: 10,
+        listener: async (message, messageConsumer) => {
+          await new Promise((resolve) => setTimeout(resolve, 10));
+          consumer1Recv += 1;
+          await messageConsumer.acknowledge(message);
+        },
+      });
+
+      const consumer2 = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        receiverQueueSize: 10,
+      });
+
+      let consumer2Recv = 0;
+      while (true) {
+        await new Promise((resolve) => setTimeout(resolve, 10));
+        try {
+          const msg = await consumer2.receive(3000);
+          consumer2Recv += 1;
+          await consumer2.acknowledge(msg);
+        } catch (err) {
+          break;
+        }
+      }
+
+      // Ensure that each consumer receives at least 1 times (greater than and not equal)
+      // the receiver queue size messages.
+      // This way any of the consumers will not immediately empty all messages of a topic.
+      expect(consumer1Recv).toBeGreaterThan(10);
+      expect(consumer1Recv).toBeGreaterThan(10);
+
+      await consumer1.close();
+      await consumer2.close();
+      await producer.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',