You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "BewareMyPower (via GitHub)" <gi...@apache.org> on 2023/04/04 16:43:19 UTC

[GitHub] [pulsar-client-node] BewareMyPower commented on a diff in pull request #309: [Fix] Fix message listener doesn't respect receiver queue size

BewareMyPower commented on code in PR #309:
URL: https://github.com/apache/pulsar-client-node/pull/309#discussion_r1157505847


##########
src/Consumer.cc:
##########
@@ -82,9 +96,15 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag
     return;
   }
 
-  MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
+  std::promise<void> promise;
+  std::future<void> future = promise.get_future();
+  MessageListenerProxyData *dataPtr =
+      new MessageListenerProxyData(cMessage, consumer, [&]() { promise.set_value(); });
   listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
   listenerCallback->callback.Release();
+
+  future.wait();
+  delete dataPtr;

Review Comment:
   It's better not to use `delete` directly, use `std::unique_ptr`.



##########
src/Consumer.cc:
##########
@@ -55,21 +56,34 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
 struct MessageListenerProxyData {
   std::shared_ptr<pulsar_message_t> cMessage;
   Consumer *consumer;
+  std::function<void(void)> callback;
 
-  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer)
-      : cMessage(cMessage), consumer(consumer) {}
+  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Consumer *consumer,
+                           std::function<void(void)> callback)
+      : cMessage(cMessage), consumer(consumer), callback(callback) {}
 };
 
 void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Consumer *consumer = data->consumer;
-  delete data;

Review Comment:
   I think we should still delete the data. Replace this line with `std::unique_ptr<MessageListenerProxyData*> data_guard` is a common used RAII solution.



##########
src/Consumer.cc:
##########
@@ -82,9 +96,15 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag
     return;
   }
 
-  MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, consumer);
+  std::promise<void> promise;
+  std::future<void> future = promise.get_future();
+  MessageListenerProxyData *dataPtr =
+      new MessageListenerProxyData(cMessage, consumer, [&]() { promise.set_value(); });

Review Comment:
   ```suggestion
         new MessageListenerProxyData(cMessage, consumer, [&promise]() { promise.set_value(); });
   ```
   
   Capture the specific value or reference instead of capturing all values or references.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org